http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java index f3842b0..9fdd447 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.blobstore; import java.io.IOException; @@ -45,7 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BlobStoreUtils { - private static final String BLOBSTORE_SUBTREE="/blobstore"; + private static final String BLOBSTORE_SUBTREE = "/blobstore"; private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtils.class); @@ -59,7 +54,8 @@ public class BlobStoreUtils { Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf); CuratorFramework zkClient = CuratorUtils.newCurator(conf, zkServers, port, - (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo, type.getDefaultZkAcls(conf)); + (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo, + type.getDefaultZkAcls(conf)); zkClient.start(); return zkClient; } @@ -94,7 +90,7 @@ public class BlobStoreUtils { int latestSeqNumber = getLatestSequenceNumber(stateInfoList); LOG.debug("getNimbodesWithLatestSequenceNumberOfBlob stateInfo {} version {}", stateInfoList, latestSeqNumber); // Get the nimbodes with the latest version - for(String state : stateInfoList) { + for (String state : stateInfoList) { BlobKeySequenceInfo sequenceInfo = normalizeNimbusHostPortSequenceNumberInfo(state); if (latestSeqNumber == Integer.parseInt(sequenceInfo.getSequenceNumber())) { nimbusInfoSet.add(NimbusInfo.parse(sequenceInfo.getNimbusHostPort())); @@ -122,14 +118,14 @@ public class BlobStoreUtils { // Download missing blobs from potential nimbodes public static boolean downloadMissingBlob(Map<String, Object> conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos) - throws TTransportException { + throws TTransportException { ReadableBlobMeta rbm; ClientBlobStore remoteBlobStore; InputStreamWithMeta in; boolean isSuccess = false; LOG.debug("Download blob NimbusInfos {}", nimbusInfos); for (NimbusInfo nimbusInfo : nimbusInfos) { - if(isSuccess) { + if (isSuccess) { break; } LOG.debug("Download blob key: {}, NimbusInfo {}", key, nimbusInfo); @@ -171,7 +167,7 @@ public class BlobStoreUtils { // Download updated blobs from potential nimbodes public static boolean downloadUpdatedBlob(Map<String, Object> conf, BlobStore blobStore, String key, Set<NimbusInfo> nimbusInfos) - throws TTransportException { + throws TTransportException { ClientBlobStore remoteBlobStore; InputStreamWithMeta in; AtomicOutputStream out; @@ -233,7 +229,8 @@ public class BlobStoreUtils { cb.createStateInZookeeper(key); } - public static void updateKeyForBlobStore (Map<String, Object> conf, BlobStore blobStore, CuratorFramework zkClient, String key, NimbusInfo nimbusDetails) { + public static void updateKeyForBlobStore(Map<String, Object> conf, BlobStore blobStore, CuratorFramework zkClient, String key, + NimbusInfo nimbusDetails) { try { // Most of clojure tests currently try to access the blobs using getBlob. Since, updateKeyForBlobStore // checks for updating the correct version of the blob as a part of nimbus ha before performing any
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java b/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java index 193ccac..19351b1 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/BlobSynchronizer.java @@ -1,30 +1,24 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.blobstore; import java.nio.channels.ClosedByInterruptException; import java.util.HashSet; import java.util.Map; import java.util.Set; - +import org.apache.curator.framework.CuratorFramework; import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.nimbus.NimbusInfo; -import org.apache.curator.framework.CuratorFramework; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,14 +44,6 @@ public class BlobSynchronizer { this.nimbusInfo = nimbusInfo; } - public void setZookeeperKeySet(Set<String> zookeeperKeySet) { - this.zookeeperKeySet = zookeeperKeySet; - } - - public void setBlobStoreKeySet(Set<String> blobStoreKeySet) { - this.blobStoreKeySet = blobStoreKeySet; - } - public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } @@ -68,19 +54,28 @@ public class BlobSynchronizer { return keySet; } + public void setBlobStoreKeySet(Set<String> blobStoreKeySet) { + this.blobStoreKeySet = blobStoreKeySet; + } + public Set<String> getZookeeperKeySet() { Set<String> keySet = new HashSet<String>(); keySet.addAll(zookeeperKeySet); return keySet; } + public void setZookeeperKeySet(Set<String> zookeeperKeySet) { + this.zookeeperKeySet = zookeeperKeySet; + } + public synchronized void syncBlobs() { try { - LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}",getBlobStoreKeySet(), getZookeeperKeySet()); + LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}", getBlobStoreKeySet(), getZookeeperKeySet()); deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), getZookeeperKeySet()); updateKeySetForBlobStore(getBlobStoreKeySet()); Set<String> keySetToDownload = getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet()); - LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> {}", getBlobStoreKeySet(), getZookeeperKeySet(), keySetToDownload); + LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}-> {}", getBlobStoreKeySet(), getZookeeperKeySet(), + keySetToDownload); for (String key : keySetToDownload) { try { @@ -93,16 +88,16 @@ public class BlobSynchronizer { LOG.debug("Detected deletion for the key {} while downloading - skipping download", key); } } - } catch(InterruptedException | ClosedByInterruptException exp) { + } catch (InterruptedException | ClosedByInterruptException exp) { LOG.error("Interrupt Exception {}", exp); - } catch(Exception exp) { + } catch (Exception exp) { throw new RuntimeException(exp); } } public void deleteKeySetFromBlobStoreNotOnZookeeper(Set<String> keySetBlobStore, Set<String> keySetZookeeper) throws Exception { if (keySetBlobStore.removeAll(keySetZookeeper) - || (keySetZookeeper.isEmpty() && !keySetBlobStore.isEmpty())) { + || (keySetZookeeper.isEmpty() && !keySetBlobStore.isEmpty())) { LOG.debug("Key set to delete in blobstore {}", keySetBlobStore); for (String key : keySetBlobStore) { blobStore.deleteBlob(key, BlobStoreUtils.getNimbusSubject()); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/FileBlobStoreImpl.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/FileBlobStoreImpl.java b/storm-server/src/main/java/org/apache/storm/blobstore/FileBlobStoreImpl.java index 20a389f..782a5bf 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/FileBlobStoreImpl.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/FileBlobStoreImpl.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.blobstore; import com.google.common.annotations.VisibleForTesting; @@ -34,7 +29,6 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Timer; import java.util.TimerTask; - import org.apache.storm.Config; import org.apache.storm.utils.ObjectReader; import org.slf4j.Logger; @@ -48,71 +42,8 @@ public class FileBlobStoreImpl { private static final int BUCKETS = 1024; private static final Logger LOG = LoggerFactory.getLogger(FileBlobStoreImpl.class); private static final Timer timer = new Timer("FileBlobStore cleanup thread", true); - - public class KeyInHashDirIterator implements Iterator<String> { - private int currentBucket = 0; - private Iterator<String> it = null; - private String next = null; - - public KeyInHashDirIterator() throws IOException { - primeNext(); - } - - private void primeNext() throws IOException { - while (it == null && currentBucket < BUCKETS) { - String name = String.valueOf(currentBucket); - File dir = new File(fullPath, name); - try { - it = listKeys(dir); - } catch (FileNotFoundException e) { - it = null; - } - if (it == null || !it.hasNext()) { - it = null; - currentBucket++; - } else { - next = it.next(); - } - } - } - - @Override - public boolean hasNext() { - return next != null; - } - - @Override - public String next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - String current = next; - next = null; - if (it != null) { - if (!it.hasNext()) { - it = null; - currentBucket++; - try { - primeNext(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else { - next = it.next(); - } - } - return current; - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Delete Not Supported"); - } - } - private File fullPath; private TimerTask cleanup = null; - public FileBlobStoreImpl(File path, Map<String, Object> conf) throws IOException { LOG.info("Creating new blob store based in {}", path); fullPath = path; @@ -185,9 +116,9 @@ public class FileBlobStoreImpl { @VisibleForTesting File getKeyDir(String key) { - String hash = String.valueOf(Math.abs((long)key.hashCode()) % BUCKETS); + String hash = String.valueOf(Math.abs((long) key.hashCode()) % BUCKETS); File ret = new File(new File(fullPath, hash), key); - LOG.debug("{} Looking for {} in {}", new Object[]{fullPath, key, hash}); + LOG.debug("{} Looking for {} in {}", new Object[]{ fullPath, key, hash }); return ret; } @@ -203,7 +134,7 @@ public class FileBlobStoreImpl { try { keyDir.delete(); } catch (Exception e) { - LOG.warn("Could not delete "+keyDir+" will try again later"); + LOG.warn("Could not delete " + keyDir + " will try again later"); } } while (i.hasNext()) { @@ -221,12 +152,12 @@ public class FileBlobStoreImpl { ArrayList<LocalFsBlobStoreFile> ret = new ArrayList<LocalFsBlobStoreFile>(); File[] files = path.listFiles(); if (files != null) { - for (File sub: files) { + for (File sub : files) { try { ret.add(new LocalFsBlobStoreFile(sub.getParentFile(), sub.getName())); } catch (IllegalArgumentException e) { //Ignored the file did not match - LOG.warn("Found an unexpected file in {} {}",path, sub.getName()); + LOG.warn("Found an unexpected file in {} {}", path, sub.getName()); } } } @@ -267,4 +198,65 @@ public class FileBlobStoreImpl { cleanup = null; } } + + public class KeyInHashDirIterator implements Iterator<String> { + private int currentBucket = 0; + private Iterator<String> it = null; + private String next = null; + + public KeyInHashDirIterator() throws IOException { + primeNext(); + } + + private void primeNext() throws IOException { + while (it == null && currentBucket < BUCKETS) { + String name = String.valueOf(currentBucket); + File dir = new File(fullPath, name); + try { + it = listKeys(dir); + } catch (FileNotFoundException e) { + it = null; + } + if (it == null || !it.hasNext()) { + it = null; + currentBucket++; + } else { + next = it.next(); + } + } + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public String next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + String current = next; + next = null; + if (it != null) { + if (!it.hasNext()) { + it = null; + currentBucket++; + try { + primeNext(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + next = it.next(); + } + } + return current; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Delete Not Supported"); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java b/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java index 570e0ad..31ffae9 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/KeySequenceNumber.java @@ -1,28 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.blobstore; import java.nio.ByteBuffer; -import java.util.TreeSet; import java.util.List; -import java.util.Map; - +import java.util.TreeSet; import org.apache.curator.framework.CuratorFramework; import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.nimbus.NimbusInfo; @@ -119,7 +111,7 @@ import org.slf4j.LoggerFactory; */ public class KeySequenceNumber { private static final Logger LOG = LoggerFactory.getLogger(KeySequenceNumber.class); - private final String BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE="/blobstoremaxkeysequencenumber"; + private final String BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE = "/blobstoremaxkeysequencenumber"; private final String key; private final NimbusInfo nimbusInfo; private final int INT_CAPACITY = 4; @@ -138,7 +130,7 @@ public class KeySequenceNumber { zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key); zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key, - ByteBuffer.allocate(INT_CAPACITY).putInt(INITIAL_SEQUENCE_NUMBER).array()); + ByteBuffer.allocate(INT_CAPACITY).putInt(INITIAL_SEQUENCE_NUMBER).array()); return INITIAL_SEQUENCE_NUMBER; } @@ -157,7 +149,7 @@ public class KeySequenceNumber { // if not assign the highest sequence number. for (String stateInfo : stateInfoList) { sequenceNumbers.add(Integer.parseInt(BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo(stateInfo) - .getSequenceNumber())); + .getSequenceNumber())); } // Update scenario 2 and 3 explain the code logic written here @@ -201,7 +193,7 @@ public class KeySequenceNumber { // there's a race condition with a delete: either blobstore or blobstoremaxsequence // this should be thrown to the caller to indicate that the key is invalid now throw new KeyNotFoundException(key); - } catch(Exception e) { + } catch (Exception e) { // in other case, just set this to 0 to trigger re-sync later LOG.error("Exception {}", e); return INITIAL_SEQUENCE_NUMBER - 1; @@ -210,8 +202,8 @@ public class KeySequenceNumber { private boolean checkIfStateContainsCurrentNimbusHost(List<String> stateInfoList, NimbusInfo nimbusInfo) { boolean containsNimbusHost = false; - for(String stateInfo:stateInfoList) { - if(stateInfo.contains(nimbusInfo.getHost())) { + for (String stateInfo : stateInfoList) { + if (stateInfo.contains(nimbusInfo.getHost())) { containsNimbusHost = true; break; } @@ -221,11 +213,11 @@ public class KeySequenceNumber { private void incrementMaxSequenceNumber(CuratorFramework zkClient, int count) throws Exception { zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key, - ByteBuffer.allocate(INT_CAPACITY).putInt(count + 1).array()); + ByteBuffer.allocate(INT_CAPACITY).putInt(count + 1).array()); } private int getMaxSequenceNumber(CuratorFramework zkClient) throws Exception { return ByteBuffer.wrap(zkClient.getData() - .forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key)).getInt(); + .forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key)).getInt(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java index 831fb78..6da1f47 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java @@ -1,55 +1,46 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.blobstore; +import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.security.auth.Subject; +import org.apache.curator.framework.CuratorFramework; import org.apache.storm.cluster.DaemonType; -import org.apache.storm.generated.SettableBlobMeta; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.KeyAlreadyExistsException; import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.generated.ReadableBlobMeta; - +import org.apache.storm.generated.SettableBlobMeta; import org.apache.storm.nimbus.NimbusInfo; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.Utils; import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.curator.framework.CuratorFramework; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.Subject; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.FileNotFoundException; -import java.io.InputStream; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - import static org.apache.storm.blobstore.BlobStoreAclHandler.ADMIN; import static org.apache.storm.blobstore.BlobStoreAclHandler.READ; import static org.apache.storm.blobstore.BlobStoreAclHandler.WRITE; -import com.google.common.annotations.VisibleForTesting; - /** * Provides a local file system backed blob store implementation for Nimbus. * @@ -72,11 +63,11 @@ public class LocalFsBlobStore extends BlobStore { public static final Logger LOG = LoggerFactory.getLogger(LocalFsBlobStore.class); private static final String DATA_PREFIX = "data_"; private static final String META_PREFIX = "meta_"; - protected BlobStoreAclHandler _aclHandler; private final String BLOBSTORE_SUBTREE = "/blobstore/"; + private final int allPermissions = READ | WRITE | ADMIN; + protected BlobStoreAclHandler _aclHandler; private NimbusInfo nimbusInfo; private FileBlobStoreImpl fbs; - private final int allPermissions = READ | WRITE | ADMIN; private Map<String, Object> conf; private CuratorFramework zkClient; @@ -98,22 +89,23 @@ public class LocalFsBlobStore extends BlobStore { } @Override - public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException { + public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, + KeyAlreadyExistsException { LOG.debug("Creating Blob for key {}", key); validateKey(key); _aclHandler.normalizeSettableBlobMeta(key, meta, who, allPermissions); BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); _aclHandler.hasPermissions(meta.get_acl(), allPermissions, who, key); - if (fbs.exists(DATA_PREFIX+key)) { + if (fbs.exists(DATA_PREFIX + key)) { throw new KeyAlreadyExistsException(key); } BlobStoreFileOutputStream mOut = null; try { - mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, true)); + mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX + key, true)); mOut.write(Utils.thriftSerialize(meta)); mOut.close(); mOut = null; - return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, true)); + return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX + key, true)); } catch (IOException e) { throw new RuntimeException(e); } finally { @@ -132,7 +124,7 @@ public class LocalFsBlobStore extends BlobStore { validateKey(key); checkPermission(key, who, WRITE); try { - return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, false)); + return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX + key, false)); } catch (IOException e) { throw new RuntimeException(e); } @@ -141,14 +133,14 @@ public class LocalFsBlobStore extends BlobStore { private SettableBlobMeta getStoredBlobMeta(String key) throws KeyNotFoundException { InputStream in = null; try { - LocalFsBlobStoreFile pf = fbs.read(META_PREFIX+key); + LocalFsBlobStoreFile pf = fbs.read(META_PREFIX + key); try { in = pf.getInputStream(); } catch (FileNotFoundException fnf) { throw new KeyNotFoundException(key); } ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte [] buffer = new byte[2048]; + byte[] buffer = new byte[2048]; int len; while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); @@ -172,7 +164,7 @@ public class LocalFsBlobStore extends BlobStore { @Override public ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException { validateKey(key); - if(!checkForBlobOrDownload(key)) { + if (!checkForBlobOrDownload(key)) { checkForBlobUpdate(key); } SettableBlobMeta meta = getStoredBlobMeta(key); @@ -180,7 +172,7 @@ public class LocalFsBlobStore extends BlobStore { ReadableBlobMeta rbm = new ReadableBlobMeta(); rbm.set_settable(meta); try { - LocalFsBlobStoreFile pf = fbs.read(DATA_PREFIX+key); + LocalFsBlobStoreFile pf = fbs.read(DATA_PREFIX + key); rbm.set_version(pf.getModTime()); } catch (IOException e) { throw new RuntimeException(e); @@ -198,7 +190,7 @@ public class LocalFsBlobStore extends BlobStore { _aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key); BlobStoreFileOutputStream mOut = null; try { - mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX+key, false)); + mOut = new BlobStoreFileOutputStream(fbs.write(META_PREFIX + key, false)); mOut.write(Utils.thriftSerialize(meta)); mOut.close(); mOut = null; @@ -233,7 +225,7 @@ public class LocalFsBlobStore extends BlobStore { // able to delete the blob without checking meta's ACL // skip checking everything and continue deleting local files LOG.debug("Given subject is eligible to delete key without checking ACL, skipping... key: {} subject: {}", - key, who); + key, who); } try { @@ -265,13 +257,13 @@ public class LocalFsBlobStore extends BlobStore { @Override public InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException { validateKey(key); - if(!checkForBlobOrDownload(key)) { + if (!checkForBlobOrDownload(key)) { checkForBlobUpdate(key); } SettableBlobMeta meta = getStoredBlobMeta(key); _aclHandler.hasPermissions(meta.get_acl(), READ, who, key); try { - return new BlobStoreFileInputStream(fbs.read(DATA_PREFIX+key)); + return new BlobStoreFileInputStream(fbs.read(DATA_PREFIX + key)); } catch (IOException e) { throw new RuntimeException(e); } @@ -314,7 +306,7 @@ public class LocalFsBlobStore extends BlobStore { @Override public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException { throw new UnsupportedOperationException("For local file system blob store the update blobs function does not work. " + - "Please use HDFS blob store to make this feature available."); + "Please use HDFS blob store to make this feature available."); } //This additional check and download is for nimbus high availability in case you have more than one nimbus @@ -347,7 +339,7 @@ public class LocalFsBlobStore extends BlobStore { public void fullCleanup(long age) throws IOException { fbs.fullCleanup(age); } - + @VisibleForTesting File getKeyDataDir(String key) { return fbs.getKeyDir(DATA_PREFIX + key); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java index 1b7c969..d160ba6 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStoreFile.java @@ -1,23 +1,16 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.blobstore; -import org.apache.storm.generated.SettableBlobMeta; +package org.apache.storm.blobstore; import java.io.File; import java.io.FileInputStream; @@ -28,14 +21,15 @@ import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.regex.Matcher; +import org.apache.storm.generated.SettableBlobMeta; public class LocalFsBlobStoreFile extends BlobStoreFile { private final String _key; private final boolean _isTmp; private final File _path; - private Long _modTime = null; private final boolean _mustBeNew; + private Long _modTime = null; private SettableBlobMeta meta; public LocalFsBlobStoreFile(File base, String name) { @@ -44,7 +38,7 @@ public class LocalFsBlobStoreFile extends BlobStoreFile { } else { Matcher m = TMP_NAME_PATTERN.matcher(name); if (!m.matches()) { - throw new IllegalArgumentException("File name does not match '"+name+"' !~ "+TMP_NAME_PATTERN); + throw new IllegalArgumentException("File name does not match '" + name + "' !~ " + TMP_NAME_PATTERN); } _isTmp = true; } @@ -58,7 +52,7 @@ public class LocalFsBlobStoreFile extends BlobStoreFile { _isTmp = isTmp; _mustBeNew = mustBeNew; if (_isTmp) { - _path = new File(base, System.currentTimeMillis()+TMP_EXT); + _path = new File(base, System.currentTimeMillis() + TMP_EXT); } else { _path = new File(base, BlobStoreFile.BLOBSTORE_DATA_FILE); } @@ -109,7 +103,7 @@ public class LocalFsBlobStoreFile extends BlobStoreFile { success = _path.createNewFile(); } if (!success) { - throw new IOException(_path+" already exists"); + throw new IOException(_path + " already exists"); } return new FileOutputStream(_path); } @@ -137,18 +131,18 @@ public class LocalFsBlobStoreFile extends BlobStoreFile { } @Override - public SettableBlobMeta getMetadata () { + public SettableBlobMeta getMetadata() { return meta; } @Override - public void setMetadata (SettableBlobMeta meta) { + public void setMetadata(SettableBlobMeta meta) { this.meta = meta; } @Override public String toString() { - return _path+":"+(_isTmp ? "tmp": BlobStoreFile.BLOBSTORE_DATA_FILE)+":"+_key; + return _path + ":" + (_isTmp ? "tmp" : BlobStoreFile.BLOBSTORE_DATA_FILE) + ":" + _key; } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java b/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java index 733d2ab..8e2ae3c 100644 --- a/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java +++ b/storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.container; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java b/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java index 4079215..a68e18e 100644 --- a/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java +++ b/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.container.cgroup; @@ -34,7 +28,6 @@ import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.container.ResourceIsolationInterface; @@ -50,17 +43,40 @@ import org.slf4j.LoggerFactory; public class CgroupManager implements ResourceIsolationInterface { private static final Logger LOG = LoggerFactory.getLogger(CgroupManager.class); - + private static final Pattern MEMINFO_PATTERN = Pattern.compile("^([^:\\s]+):\\s*([0-9]+)\\s*kB$"); private CgroupCenter center; - private Hierarchy hierarchy; - private CgroupCommon rootCgroup; - private String rootDir; - private Map<String, Object> conf; + static long getMemInfoFreeMb() throws IOException { + //MemFree: 14367072 kB + //Buffers: 536512 kB + //Cached: 1192096 kB + // MemFree + Buffers + Cached + long memFree = 0; + long buffers = 0; + long cached = 0; + try (BufferedReader in = new BufferedReader(new FileReader("/proc/meminfo"))) { + String line = null; + while ((line = in.readLine()) != null) { + Matcher match = MEMINFO_PATTERN.matcher(line); + if (match.matches()) { + String tag = match.group(1); + if (tag.equalsIgnoreCase("MemFree")) { + memFree = Long.parseLong(match.group(2)); + } else if (tag.equalsIgnoreCase("Buffers")) { + buffers = Long.parseLong(match.group(2)); + } else if (tag.equalsIgnoreCase("Cached")) { + cached = Long.parseLong(match.group(2)); + } + } + } + } + return (memFree + buffers + cached) / 1024; + } + /** * initialize data structures. * @@ -102,7 +118,7 @@ public class CgroupManager implements ResourceIsolationInterface { Set<SubSystemType> types = new HashSet<>(); types.add(SubSystemType.cpu); this.hierarchy = new Hierarchy(DaemonConfig.getCgroupStormHierarchyName(conf), types, - DaemonConfig.getCgroupStormHierarchyDir(conf)); + DaemonConfig.getCgroupStormHierarchyDir(conf)); } this.rootCgroup = new CgroupCommon(this.rootDir, this.hierarchy, this.hierarchy.getRootCgroups()); @@ -194,7 +210,7 @@ public class CgroupManager implements ResourceIsolationInterface { Set<Integer> tasks = workerGroup.getTasks(); if (!tasks.isEmpty()) { throw new Exception("Cannot correctly shutdown worker CGroup " + workerId + "tasks " + tasks - + " still running!"); + + " still running!"); } this.center.deleteCgroup(workerGroup); } catch (Exception e) { @@ -254,35 +270,6 @@ public class CgroupManager implements ResourceIsolationInterface { return memCore.getPhysicalUsage(); } - private static final Pattern MEMINFO_PATTERN = Pattern.compile("^([^:\\s]+):\\s*([0-9]+)\\s*kB$"); - - static long getMemInfoFreeMb() throws IOException { - //MemFree: 14367072 kB - //Buffers: 536512 kB - //Cached: 1192096 kB - // MemFree + Buffers + Cached - long memFree = 0; - long buffers = 0; - long cached = 0; - try (BufferedReader in = new BufferedReader(new FileReader("/proc/meminfo"))) { - String line = null; - while ((line = in.readLine()) != null) { - Matcher match = MEMINFO_PATTERN.matcher(line); - if (match.matches()) { - String tag = match.group(1); - if (tag.equalsIgnoreCase("MemFree")) { - memFree = Long.parseLong(match.group(2)); - } else if (tag.equalsIgnoreCase("Buffers")) { - buffers = Long.parseLong(match.group(2)); - } else if (tag.equalsIgnoreCase("Cached")) { - cached = Long.parseLong(match.group(2)); - } - } - } - } - return (memFree + buffers + cached) / 1024; - } - @Override public long getSystemFreeMemoryMb() throws IOException { long rootCgroupLimitFree = Long.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java b/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java index 489af28..939c1fc 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java @@ -15,17 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.drpc; import java.util.concurrent.Semaphore; - import org.apache.storm.generated.DRPCExceptionType; import org.apache.storm.generated.DRPCExecutionException; import org.apache.storm.generated.DRPCRequest; public class BlockingOutstandingRequest extends OutstandingRequest { public static final RequestFactory<BlockingOutstandingRequest> FACTORY = - (function, request) -> new BlockingOutstandingRequest(function, request); + (function, request) -> new BlockingOutstandingRequest(function, request); private Semaphore _sem; private volatile String _result = null; private volatile DRPCExecutionException _e = null; @@ -49,7 +49,7 @@ public class BlockingOutstandingRequest extends OutstandingRequest { if (_e == null) { _e = new DRPCExecutionException("Internal Error: No Result and No Exception"); _e.set_type(DRPCExceptionType.INTERNAL_ERROR); - } + } throw _e; } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java index 4093961..94d6b3b 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java @@ -15,8 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.drpc; +import com.codahale.metrics.Meter; +import com.google.common.annotations.VisibleForTesting; import java.security.Principal; import java.util.HashMap; import java.util.Map; @@ -26,7 +29,6 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; - import org.apache.storm.DaemonConfig; import org.apache.storm.daemon.StormCommon; import org.apache.storm.generated.AuthorizationException; @@ -42,26 +44,48 @@ import org.apache.storm.utils.ObjectReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Meter; -import com.google.common.annotations.VisibleForTesting; - public class DRPC implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DRPC.class); - private static final DRPCRequest NOTHING_REQUEST = new DRPCRequest("",""); + private static final DRPCRequest NOTHING_REQUEST = new DRPCRequest("", ""); private static final DRPCExecutionException TIMED_OUT = new DRPCExecutionException("Timed Out"); private static final DRPCExecutionException SHUT_DOWN = new DRPCExecutionException("Server Shutting Down"); private static final DRPCExecutionException DEFAULT_FAILED = new DRPCExecutionException("Request failed"); - static { - TIMED_OUT.set_type(DRPCExceptionType.SERVER_TIMEOUT); - SHUT_DOWN.set_type(DRPCExceptionType.SERVER_SHUTDOWN); - DEFAULT_FAILED.set_type(DRPCExceptionType.FAILED_REQUEST); - } private static final Meter meterServerTimedOut = StormMetricsRegistry.registerMeter("drpc:num-server-timedout-requests"); private static final Meter meterExecuteCalls = StormMetricsRegistry.registerMeter("drpc:num-execute-calls"); private static final Meter meterResultCalls = StormMetricsRegistry.registerMeter("drpc:num-result-calls"); private static final Meter meterFailRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-failRequest-calls"); private static final Meter meterFetchRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-fetchRequest-calls"); - + + static { + TIMED_OUT.set_type(DRPCExceptionType.SERVER_TIMEOUT); + SHUT_DOWN.set_type(DRPCExceptionType.SERVER_SHUTDOWN); + DEFAULT_FAILED.set_type(DRPCExceptionType.FAILED_REQUEST); + } + + //Waiting to be fetched + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<OutstandingRequest>> _queues = + new ConcurrentHashMap<>(); + //Waiting to be returned + private final ConcurrentHashMap<String, OutstandingRequest> _requests = + new ConcurrentHashMap<>(); + private final Timer _timer = new Timer(); + private final AtomicLong _ctr = new AtomicLong(0); + private final IAuthorizer _auth; + + public DRPC(Map<String, Object> conf) { + this(mkAuthorizationHandler((String) conf.get(DaemonConfig.DRPC_AUTHORIZER), conf), + ObjectReader.getInt(conf.get(DaemonConfig.DRPC_REQUEST_TIMEOUT_SECS), 600) * 1000); + } + public DRPC(IAuthorizer auth, long timeoutMs) { + _auth = auth; + _timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + cleanupAll(timeoutMs, TIMED_OUT); + } + }, timeoutMs / 2, timeoutMs / 2); + } + private static IAuthorizer mkAuthorizationHandler(String klassname, Map<String, Object> conf) { try { return StormCommon.mkAuthorizationHandler(klassname, conf); @@ -76,7 +100,7 @@ public class DRPC implements AutoCloseable { private static void logAccess(ReqContext reqContext, String operation, String function) { ThriftAccessLogger.logAccessFunction(reqContext.requestID(), reqContext.remoteAddress(), reqContext.principal(), operation, - function); + function); } @VisibleForTesting @@ -100,33 +124,7 @@ public class DRPC implements AutoCloseable { } } } - - //Waiting to be fetched - private final ConcurrentHashMap<String, ConcurrentLinkedQueue<OutstandingRequest>> _queues = - new ConcurrentHashMap<>(); - //Waiting to be returned - private final ConcurrentHashMap<String, OutstandingRequest> _requests = - new ConcurrentHashMap<>(); - private final Timer _timer = new Timer(); - private final AtomicLong _ctr = new AtomicLong(0); - private final IAuthorizer _auth; - - public DRPC(Map<String, Object> conf) { - this(mkAuthorizationHandler((String)conf.get(DaemonConfig.DRPC_AUTHORIZER), conf), - ObjectReader.getInt(conf.get(DaemonConfig.DRPC_REQUEST_TIMEOUT_SECS), 600) * 1000); - } - - public DRPC(IAuthorizer auth, long timeoutMs) { - _auth = auth; - _timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - cleanupAll(timeoutMs, TIMED_OUT); - } - }, timeoutMs/2, timeoutMs/2); - } - - + private void checkAuthorization(String operation, String function) throws AuthorizationException { checkAuthorization(ReqContext.context(), _auth, operation, function); } @@ -134,7 +132,7 @@ public class DRPC implements AutoCloseable { private void checkAuthorizationNoLog(String operation, String function) throws AuthorizationException { checkAuthorization(ReqContext.context(), _auth, operation, function, false); } - + private void cleanup(String id) { OutstandingRequest req = _requests.remove(id); if (req != null && !req.wasFetched()) { @@ -152,7 +150,7 @@ public class DRPC implements AutoCloseable { } } } - + private String nextId() { return String.valueOf(_ctr.incrementAndGet()); } @@ -207,7 +205,8 @@ public class DRPC implements AutoCloseable { } } - public <T extends OutstandingRequest> T execute(String functionName, String funcArgs, RequestFactory<T> factory) throws AuthorizationException { + public <T extends OutstandingRequest> T execute(String functionName, String funcArgs, RequestFactory<T> factory) throws + AuthorizationException { meterExecuteCalls.mark(); checkAuthorization("execute", functionName); String id = nextId(); @@ -218,11 +217,11 @@ public class DRPC implements AutoCloseable { q.add(req); return req; } - + public String executeBlocking(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException { BlockingOutstandingRequest req = execute(functionName, funcArgs, BlockingOutstandingRequest.FACTORY); try { - LOG.debug("Waiting for result {} {}",functionName, funcArgs); + LOG.debug("Waiting for result {} {}", functionName, funcArgs); return req.getResult(); } catch (DRPCExecutionException e) { throw e; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPCThrift.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPCThrift.java b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPCThrift.java index 2d88e8e..01d2392 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPCThrift.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPCThrift.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.drpc; import org.apache.storm.generated.AuthorizationException; @@ -52,7 +53,7 @@ public class DRPCThrift implements DistributedRPC.Iface, DistributedRPCInvocatio @Override public String execute(String functionName, String funcArgs) - throws DRPCExecutionException, AuthorizationException { + throws DRPCExecutionException, AuthorizationException { return _drpc.executeBlocking(functionName, funcArgs); } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/drpc/OutstandingRequest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/drpc/OutstandingRequest.java b/storm-server/src/main/java/org/apache/storm/daemon/drpc/OutstandingRequest.java index adb916f..06c596e 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/drpc/OutstandingRequest.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/drpc/OutstandingRequest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.drpc; import org.apache.storm.generated.DRPCExecutionException; @@ -42,7 +43,7 @@ public abstract class OutstandingRequest { } public boolean wasFetched() { - return _fetched; + return _fetched; } public String getFunction() { @@ -54,5 +55,6 @@ public abstract class OutstandingRequest { } public abstract void returnResult(String result); + public abstract void fail(DRPCExecutionException e); } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/drpc/RequestFactory.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/drpc/RequestFactory.java b/storm-server/src/main/java/org/apache/storm/daemon/drpc/RequestFactory.java index 6a0e610..e6cd799 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/drpc/RequestFactory.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/drpc/RequestFactory.java @@ -15,10 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.drpc; import org.apache.storm.generated.DRPCRequest; public interface RequestFactory<T extends OutstandingRequest> { - public T mkRequest(String function, DRPCRequest req); + public T mkRequest(String function, DRPCRequest req); } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java index 85ee27f..abd7cb1 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java @@ -22,7 +22,6 @@ import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.Map; - import org.apache.storm.DaemonConfig; import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter; import org.apache.storm.daemon.metrics.reporters.PreparableReporter; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java index cd6d069..47e4f24 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.daemon.metrics.reporters; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java index 3ab6a99..4952051 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.daemon.metrics.reporters; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java index c369879..3fc77d1 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.daemon.metrics.reporters; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java index 958a01d..0e1a6e3 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/PreparableReporter.java @@ -1,32 +1,28 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.metrics.reporters; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Reporter; - import java.io.Closeable; import java.util.Map; public interface PreparableReporter<T extends Reporter & Closeable> { - void prepare(MetricRegistry metricsRegistry, Map<String, Object> topoConf); - void start(); - void stop(); + void prepare(MetricRegistry metricsRegistry, Map<String, Object> topoConf); + + void start(); + + void stop(); }
