Repository: storm
Updated Branches:
  refs/heads/master 9861f9a71 -> 64af629a1


[STORM-3036] Add isRemoteBlobExists RPC interface for deciding if remote blob 
exists.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/26c4bec9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/26c4bec9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/26c4bec9

Branch: refs/heads/master
Commit: 26c4bec99efec800b3824db67ef88f5d856354a7
Parents: 9e97a07
Author: chenyuzhao <[email protected]>
Authored: Fri May 25 00:34:14 2018 +0800
Committer: chenyuzhao <[email protected]>
Committed: Fri May 25 00:34:14 2018 +0800

----------------------------------------------------------------------
 .../hdfs/blobstore/HdfsClientBlobStore.java     |  10 +
 .../apache/storm/blobstore/ClientBlobStore.java |   6 +
 .../blobstore/LocalModeClientBlobStore.java     |  10 +
 .../apache/storm/blobstore/NimbusBlobStore.java |  11 +
 .../jvm/org/apache/storm/generated/Nimbus.java  | 986 +++++++++++++++++++
 storm-client/src/py/storm/Nimbus-remote         |   7 +
 storm-client/src/py/storm/Nimbus.py             | 205 ++++
 storm-client/src/storm.thrift                   |   4 +
 .../storm/blobstore/ClientBlobStoreTest.java    |   5 +
 .../java/org/apache/storm/LocalCluster.java     |   5 +
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  10 +
 .../LocalizedResourceRetentionSet.java          |  18 +-
 .../storm/localizer/AsyncLocalizerTest.java     |   1 +
 13 files changed, 1269 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
 
b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
index c39904c..419cf997 100644
--- 
a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
+++ 
b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
@@ -73,6 +73,16 @@ public class HdfsClientBlobStore extends ClientBlobStore {
     }
 
     @Override
+    public boolean isRemoteBlobExists(String blobKey) throws 
AuthorizationException {
+        try {
+            _blobStore.getBlob(blobKey, null);
+        } catch (KeyNotFoundException e) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
     public void setBlobMetaToExtend(String key, SettableBlobMeta meta)
             throws AuthorizationException, KeyNotFoundException {
         _blobStore.setBlobMeta(key, meta, null);

http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java 
b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
index 57e8578..f01c74e 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
@@ -91,6 +91,12 @@ public abstract class ClientBlobStore implements 
Shutdownable, AutoCloseable {
     public abstract ReadableBlobMeta getBlobMeta(String key) throws 
AuthorizationException, KeyNotFoundException;
 
     /**
+     * Decide if the blob is deleted from cluster.
+     * @param blobKey blob key
+     */
+    public abstract boolean isRemoteBlobExists(String blobKey) throws 
AuthorizationException;
+
+    /**
      * Client facing API to set the metadata for a blob.
      *
      * @param key  blob key name.

http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java 
b/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java
index 0f8287c..70ed6c2 100644
--- 
a/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java
+++ 
b/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java
@@ -58,6 +58,16 @@ public class LocalModeClientBlobStore extends 
ClientBlobStore {
     }
 
     @Override
+    public boolean isRemoteBlobExists(String blobKey) throws 
AuthorizationException {
+        try {
+            wrapped.getBlob(blobKey, null);
+        } catch (KeyNotFoundException e) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
     protected void setBlobMetaToExtend(String key, SettableBlobMeta meta) 
throws AuthorizationException, KeyNotFoundException {
         wrapped.setBlobMeta(key, meta, null);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java 
b/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
index 152f1f1..fd58062 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
@@ -92,6 +92,17 @@ public class NimbusBlobStore extends ClientBlobStore 
implements AutoCloseable {
     }
 
     @Override
+    public boolean isRemoteBlobExists(String blobKey) throws 
AuthorizationException {
+        try {
+            return client.getClient().isRemoteBlobExists(blobKey);
+        } catch (AuthorizationException aze) {
+            throw aze;
+        } catch (TException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
     protected void setBlobMetaToExtend(String key, SettableBlobMeta meta)
         throws AuthorizationException, KeyNotFoundException {
         try {

http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java 
b/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java
index ac105e1..3955ddc 100644
--- a/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java
+++ b/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java
@@ -166,6 +166,13 @@ public class Nimbus {
 
     public void processWorkerMetrics(WorkerMetrics metrics) throws 
org.apache.thrift.TException;
 
+    /**
+     * Decide if the blob is removed from cluster.
+     * 
+     * @param blobKey
+     */
+    public boolean isRemoteBlobExists(java.lang.String blobKey) throws 
AuthorizationException, org.apache.thrift.TException;
+
   }
 
   public interface AsyncIface {
@@ -270,6 +277,8 @@ public class Nimbus {
 
     public void processWorkerMetrics(WorkerMetrics metrics, 
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws 
org.apache.thrift.TException;
 
+    public void isRemoteBlobExists(java.lang.String blobKey, 
org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) 
throws org.apache.thrift.TException;
+
   }
 
   public static class Client extends org.apache.thrift.TServiceClient 
implements Iface {
@@ -1621,6 +1630,32 @@ public class Nimbus {
       return;
     }
 
+    public boolean isRemoteBlobExists(java.lang.String blobKey) throws 
AuthorizationException, org.apache.thrift.TException
+    {
+      send_isRemoteBlobExists(blobKey);
+      return recv_isRemoteBlobExists();
+    }
+
+    public void send_isRemoteBlobExists(java.lang.String blobKey) throws 
org.apache.thrift.TException
+    {
+      isRemoteBlobExists_args args = new isRemoteBlobExists_args();
+      args.set_blobKey(blobKey);
+      sendBase("isRemoteBlobExists", args);
+    }
+
+    public boolean recv_isRemoteBlobExists() throws AuthorizationException, 
org.apache.thrift.TException
+    {
+      isRemoteBlobExists_result result = new isRemoteBlobExists_result();
+      receiveBase(result, "isRemoteBlobExists");
+      if (result.is_set_success()) {
+        return result.success;
+      }
+      if (result.aze != null) {
+        throw result.aze;
+      }
+      throw new 
org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT,
 "isRemoteBlobExists failed: unknown result");
+    }
+
   }
   public static class AsyncClient extends org.apache.thrift.async.TAsyncClient 
implements AsyncIface {
     public static class Factory implements 
org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
@@ -3317,6 +3352,38 @@ public class Nimbus {
       }
     }
 
+    public void isRemoteBlobExists(java.lang.String blobKey, 
org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) 
throws org.apache.thrift.TException {
+      checkReady();
+      isRemoteBlobExists_call method_call = new 
isRemoteBlobExists_call(blobKey, resultHandler, this, ___protocolFactory, 
___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class isRemoteBlobExists_call extends 
org.apache.thrift.async.TAsyncMethodCall<java.lang.Boolean> {
+      private java.lang.String blobKey;
+      public isRemoteBlobExists_call(java.lang.String blobKey, 
org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler, 
org.apache.thrift.async.TAsyncClient client, 
org.apache.thrift.protocol.TProtocolFactory protocolFactory, 
org.apache.thrift.transport.TNonblockingTransport transport) throws 
org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.blobKey = blobKey;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws 
org.apache.thrift.TException {
+        prot.writeMessageBegin(new 
org.apache.thrift.protocol.TMessage("isRemoteBlobExists", 
org.apache.thrift.protocol.TMessageType.CALL, 0));
+        isRemoteBlobExists_args args = new isRemoteBlobExists_args();
+        args.set_blobKey(blobKey);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public java.lang.Boolean getResult() throws AuthorizationException, 
org.apache.thrift.TException {
+        if (getState() != 
org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new java.lang.IllegalStateException("Method call not 
finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = 
new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = 
client.getProtocolFactory().getProtocol(memoryTransport);
+        return (new Client(prot)).recv_isRemoteBlobExists();
+      }
+    }
+
   }
 
   public static class Processor<I extends Iface> extends 
org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
@@ -3380,6 +3447,7 @@ public class Nimbus {
       processMap.put("sendSupervisorWorkerHeartbeats", new 
sendSupervisorWorkerHeartbeats());
       processMap.put("sendSupervisorWorkerHeartbeat", new 
sendSupervisorWorkerHeartbeat());
       processMap.put("processWorkerMetrics", new processWorkerMetrics());
+      processMap.put("isRemoteBlobExists", new isRemoteBlobExists());
       return processMap;
     }
 
@@ -4868,6 +4936,36 @@ public class Nimbus {
       }
     }
 
+    public static class isRemoteBlobExists<I extends Iface> extends 
org.apache.thrift.ProcessFunction<I, isRemoteBlobExists_args> {
+      public isRemoteBlobExists() {
+        super("isRemoteBlobExists");
+      }
+
+      public isRemoteBlobExists_args getEmptyArgsInstance() {
+        return new isRemoteBlobExists_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      @Override
+      protected boolean handleRuntimeExceptions() {
+        return false;
+      }
+
+      public isRemoteBlobExists_result getResult(I iface, 
isRemoteBlobExists_args args) throws org.apache.thrift.TException {
+        isRemoteBlobExists_result result = new isRemoteBlobExists_result();
+        try {
+          result.success = iface.isRemoteBlobExists(args.blobKey);
+          result.set_success_isSet(true);
+        } catch (AuthorizationException aze) {
+          result.aze = aze;
+        }
+        return result;
+      }
+    }
+
   }
 
   public static class AsyncProcessor<I extends AsyncIface> extends 
org.apache.thrift.TBaseAsyncProcessor<I> {
@@ -4931,6 +5029,7 @@ public class Nimbus {
       processMap.put("sendSupervisorWorkerHeartbeats", new 
sendSupervisorWorkerHeartbeats());
       processMap.put("sendSupervisorWorkerHeartbeat", new 
sendSupervisorWorkerHeartbeat());
       processMap.put("processWorkerMetrics", new processWorkerMetrics());
+      processMap.put("isRemoteBlobExists", new isRemoteBlobExists());
       return processMap;
     }
 
@@ -8257,6 +8356,72 @@ public class Nimbus {
       }
     }
 
+    public static class isRemoteBlobExists<I extends AsyncIface> extends 
org.apache.thrift.AsyncProcessFunction<I, isRemoteBlobExists_args, 
java.lang.Boolean> {
+      public isRemoteBlobExists() {
+        super("isRemoteBlobExists");
+      }
+
+      public isRemoteBlobExists_args getEmptyArgsInstance() {
+        return new isRemoteBlobExists_args();
+      }
+
+      public org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> 
getResultHandler(final 
org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final 
int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new 
org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean>() { 
+          public void onComplete(java.lang.Boolean o) {
+            isRemoteBlobExists_result result = new isRemoteBlobExists_result();
+            result.success = o;
+            result.set_success_isSet(true);
+            try {
+              fcall.sendResponse(fb, result, 
org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+            } catch (org.apache.thrift.transport.TTransportException e) {
+              _LOGGER.error("TTransportException writing to internal frame 
buffer", e);
+              fb.close();
+            } catch (java.lang.Exception e) {
+              _LOGGER.error("Exception writing to internal frame buffer", e);
+              onError(e);
+            }
+          }
+          public void onError(java.lang.Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TSerializable msg;
+            isRemoteBlobExists_result result = new isRemoteBlobExists_result();
+            if (e instanceof AuthorizationException) {
+              result.aze = (AuthorizationException) e;
+              result.set_aze_isSet(true);
+              msg = result;
+            } else if (e instanceof 
org.apache.thrift.transport.TTransportException) {
+              _LOGGER.error("TTransportException inside handler", e);
+              fb.close();
+              return;
+            } else if (e instanceof org.apache.thrift.TApplicationException) {
+              _LOGGER.error("TApplicationException inside handler", e);
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TApplicationException)e;
+            } else {
+              _LOGGER.error("Exception inside handler", e);
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = new 
org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR,
 e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+            } catch (java.lang.Exception ex) {
+              _LOGGER.error("Exception writing to internal frame buffer", ex);
+              fb.close();
+            }
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, isRemoteBlobExists_args args, 
org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) 
throws org.apache.thrift.TException {
+        iface.isRemoteBlobExists(args.blobKey,resultHandler);
+      }
+    }
+
   }
 
   public static class submitTopology_args implements 
org.apache.thrift.TBase<submitTopology_args, submitTopology_args._Fields>, 
java.io.Serializable, Cloneable, Comparable<submitTopology_args>   {
@@ -52455,4 +52620,825 @@ public class Nimbus {
     }
   }
 
+  public static class isRemoteBlobExists_args implements 
org.apache.thrift.TBase<isRemoteBlobExists_args, 
isRemoteBlobExists_args._Fields>, java.io.Serializable, Cloneable, 
Comparable<isRemoteBlobExists_args>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("isRemoteBlobExists_args");
+
+    private static final org.apache.thrift.protocol.TField BLOB_KEY_FIELD_DESC 
= new org.apache.thrift.protocol.TField("blobKey", 
org.apache.thrift.protocol.TType.STRING, (short)1);
+
+    private static final org.apache.thrift.scheme.SchemeFactory 
STANDARD_SCHEME_FACTORY = new isRemoteBlobExists_argsStandardSchemeFactory();
+    private static final org.apache.thrift.scheme.SchemeFactory 
TUPLE_SCHEME_FACTORY = new isRemoteBlobExists_argsTupleSchemeFactory();
+
+    private java.lang.String blobKey; // required
+
+    /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      BLOB_KEY((short)1, "blobKey");
+
+      private static final java.util.Map<java.lang.String, _Fields> byName = 
new java.util.HashMap<java.lang.String, _Fields>();
+
+      static {
+        for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not 
found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // BLOB_KEY
+            return BLOB_KEY;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new 
java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(java.lang.String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final java.lang.String _fieldName;
+
+      _Fields(short thriftId, java.lang.String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public java.lang.String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final java.util.Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap 
= new java.util.EnumMap<_Fields, 
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.BLOB_KEY, new 
org.apache.thrift.meta_data.FieldMetaData("blobKey", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+      
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(isRemoteBlobExists_args.class,
 metaDataMap);
+    }
+
+    public isRemoteBlobExists_args() {
+    }
+
+    public isRemoteBlobExists_args(
+      java.lang.String blobKey)
+    {
+      this();
+      this.blobKey = blobKey;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public isRemoteBlobExists_args(isRemoteBlobExists_args other) {
+      if (other.is_set_blobKey()) {
+        this.blobKey = other.blobKey;
+      }
+    }
+
+    public isRemoteBlobExists_args deepCopy() {
+      return new isRemoteBlobExists_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.blobKey = null;
+    }
+
+    public java.lang.String get_blobKey() {
+      return this.blobKey;
+    }
+
+    public void set_blobKey(java.lang.String blobKey) {
+      this.blobKey = blobKey;
+    }
+
+    public void unset_blobKey() {
+      this.blobKey = null;
+    }
+
+    /** Returns true if field blobKey is set (has been assigned a value) and 
false otherwise */
+    public boolean is_set_blobKey() {
+      return this.blobKey != null;
+    }
+
+    public void set_blobKey_isSet(boolean value) {
+      if (!value) {
+        this.blobKey = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, java.lang.Object value) {
+      switch (field) {
+      case BLOB_KEY:
+        if (value == null) {
+          unset_blobKey();
+        } else {
+          set_blobKey((java.lang.String)value);
+        }
+        break;
+
+      }
+    }
+
+    public java.lang.Object getFieldValue(_Fields field) {
+      switch (field) {
+      case BLOB_KEY:
+        return get_blobKey();
+
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been 
assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new java.lang.IllegalArgumentException();
+      }
+
+      switch (field) {
+      case BLOB_KEY:
+        return is_set_blobKey();
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(java.lang.Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof isRemoteBlobExists_args)
+        return this.equals((isRemoteBlobExists_args)that);
+      return false;
+    }
+
+    public boolean equals(isRemoteBlobExists_args that) {
+      if (that == null)
+        return false;
+      if (this == that)
+        return true;
+
+      boolean this_present_blobKey = true && this.is_set_blobKey();
+      boolean that_present_blobKey = true && that.is_set_blobKey();
+      if (this_present_blobKey || that_present_blobKey) {
+        if (!(this_present_blobKey && that_present_blobKey))
+          return false;
+        if (!this.blobKey.equals(that.blobKey))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int hashCode = 1;
+
+      hashCode = hashCode * 8191 + ((is_set_blobKey()) ? 131071 : 524287);
+      if (is_set_blobKey())
+        hashCode = hashCode * 8191 + blobKey.hashCode();
+
+      return hashCode;
+    }
+
+    @Override
+    public int compareTo(isRemoteBlobExists_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = 
java.lang.Boolean.valueOf(is_set_blobKey()).compareTo(other.is_set_blobKey());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (is_set_blobKey()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.blobKey, 
other.blobKey);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws 
org.apache.thrift.TException {
+      scheme(iprot).read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws 
org.apache.thrift.TException {
+      scheme(oprot).write(oprot, this);
+    }
+
+    @Override
+    public java.lang.String toString() {
+      java.lang.StringBuilder sb = new 
java.lang.StringBuilder("isRemoteBlobExists_args(");
+      boolean first = true;
+
+      sb.append("blobKey:");
+      if (this.blobKey == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.blobKey);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, java.lang.ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class isRemoteBlobExists_argsStandardSchemeFactory 
implements org.apache.thrift.scheme.SchemeFactory {
+      public isRemoteBlobExists_argsStandardScheme getScheme() {
+        return new isRemoteBlobExists_argsStandardScheme();
+      }
+    }
+
+    private static class isRemoteBlobExists_argsStandardScheme extends 
org.apache.thrift.scheme.StandardScheme<isRemoteBlobExists_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, 
isRemoteBlobExists_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // BLOB_KEY
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) 
{
+                struct.blobKey = iprot.readString();
+                struct.set_blobKey_isSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, 
isRemoteBlobExists_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.blobKey != null) {
+          oprot.writeFieldBegin(BLOB_KEY_FIELD_DESC);
+          oprot.writeString(struct.blobKey);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class isRemoteBlobExists_argsTupleSchemeFactory implements 
org.apache.thrift.scheme.SchemeFactory {
+      public isRemoteBlobExists_argsTupleScheme getScheme() {
+        return new isRemoteBlobExists_argsTupleScheme();
+      }
+    }
+
+    private static class isRemoteBlobExists_argsTupleScheme extends 
org.apache.thrift.scheme.TupleScheme<isRemoteBlobExists_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, 
isRemoteBlobExists_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol oprot = 
(org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet optionals = new java.util.BitSet();
+        if (struct.is_set_blobKey()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.is_set_blobKey()) {
+          oprot.writeString(struct.blobKey);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, 
isRemoteBlobExists_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol iprot = 
(org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.blobKey = iprot.readString();
+          struct.set_blobKey_isSet(true);
+        }
+      }
+    }
+
+    private static <S extends org.apache.thrift.scheme.IScheme> S 
scheme(org.apache.thrift.protocol.TProtocol proto) {
+      return 
(org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? 
STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+    }
+  }
+
+  public static class isRemoteBlobExists_result implements 
org.apache.thrift.TBase<isRemoteBlobExists_result, 
isRemoteBlobExists_result._Fields>, java.io.Serializable, Cloneable, 
Comparable<isRemoteBlobExists_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("isRemoteBlobExists_result");
+
+    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC 
= new org.apache.thrift.protocol.TField("success", 
org.apache.thrift.protocol.TType.BOOL, (short)0);
+    private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = 
new org.apache.thrift.protocol.TField("aze", 
org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+    private static final org.apache.thrift.scheme.SchemeFactory 
STANDARD_SCHEME_FACTORY = new isRemoteBlobExists_resultStandardSchemeFactory();
+    private static final org.apache.thrift.scheme.SchemeFactory 
TUPLE_SCHEME_FACTORY = new isRemoteBlobExists_resultTupleSchemeFactory();
+
+    private boolean success; // required
+    private AuthorizationException aze; // required
+
+    /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      SUCCESS((short)0, "success"),
+      AZE((short)1, "aze");
+
+      private static final java.util.Map<java.lang.String, _Fields> byName = 
new java.util.HashMap<java.lang.String, _Fields>();
+
+      static {
+        for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not 
found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 0: // SUCCESS
+            return SUCCESS;
+          case 1: // AZE
+            return AZE;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new 
java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(java.lang.String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final java.lang.String _fieldName;
+
+      _Fields(short thriftId, java.lang.String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public java.lang.String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    private static final int __SUCCESS_ISSET_ID = 0;
+    private byte __isset_bitfield = 0;
+    public static final java.util.Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap 
= new java.util.EnumMap<_Fields, 
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.SUCCESS, new 
org.apache.thrift.meta_data.FieldMetaData("success", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+      tmpMap.put(_Fields.AZE, new 
org.apache.thrift.meta_data.FieldMetaData("aze", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 AuthorizationException.class)));
+      metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+      
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(isRemoteBlobExists_result.class,
 metaDataMap);
+    }
+
+    public isRemoteBlobExists_result() {
+    }
+
+    public isRemoteBlobExists_result(
+      boolean success,
+      AuthorizationException aze)
+    {
+      this();
+      this.success = success;
+      set_success_isSet(true);
+      this.aze = aze;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public isRemoteBlobExists_result(isRemoteBlobExists_result other) {
+      __isset_bitfield = other.__isset_bitfield;
+      this.success = other.success;
+      if (other.is_set_aze()) {
+        this.aze = new AuthorizationException(other.aze);
+      }
+    }
+
+    public isRemoteBlobExists_result deepCopy() {
+      return new isRemoteBlobExists_result(this);
+    }
+
+    @Override
+    public void clear() {
+      set_success_isSet(false);
+      this.success = false;
+      this.aze = null;
+    }
+
+    public boolean is_success() {
+      return this.success;
+    }
+
+    public void set_success(boolean success) {
+      this.success = success;
+      set_success_isSet(true);
+    }
+
+    public void unset_success() {
+      __isset_bitfield = 
org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __SUCCESS_ISSET_ID);
+    }
+
+    /** Returns true if field success is set (has been assigned a value) and 
false otherwise */
+    public boolean is_set_success() {
+      return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, 
__SUCCESS_ISSET_ID);
+    }
+
+    public void set_success_isSet(boolean value) {
+      __isset_bitfield = 
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __SUCCESS_ISSET_ID, 
value);
+    }
+
+    public AuthorizationException get_aze() {
+      return this.aze;
+    }
+
+    public void set_aze(AuthorizationException aze) {
+      this.aze = aze;
+    }
+
+    public void unset_aze() {
+      this.aze = null;
+    }
+
+    /** Returns true if field aze is set (has been assigned a value) and false 
otherwise */
+    public boolean is_set_aze() {
+      return this.aze != null;
+    }
+
+    public void set_aze_isSet(boolean value) {
+      if (!value) {
+        this.aze = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, java.lang.Object value) {
+      switch (field) {
+      case SUCCESS:
+        if (value == null) {
+          unset_success();
+        } else {
+          set_success((java.lang.Boolean)value);
+        }
+        break;
+
+      case AZE:
+        if (value == null) {
+          unset_aze();
+        } else {
+          set_aze((AuthorizationException)value);
+        }
+        break;
+
+      }
+    }
+
+    public java.lang.Object getFieldValue(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return is_success();
+
+      case AZE:
+        return get_aze();
+
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been 
assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new java.lang.IllegalArgumentException();
+      }
+
+      switch (field) {
+      case SUCCESS:
+        return is_set_success();
+      case AZE:
+        return is_set_aze();
+      }
+      throw new java.lang.IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(java.lang.Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof isRemoteBlobExists_result)
+        return this.equals((isRemoteBlobExists_result)that);
+      return false;
+    }
+
+    public boolean equals(isRemoteBlobExists_result that) {
+      if (that == null)
+        return false;
+      if (this == that)
+        return true;
+
+      boolean this_present_success = true;
+      boolean that_present_success = true;
+      if (this_present_success || that_present_success) {
+        if (!(this_present_success && that_present_success))
+          return false;
+        if (this.success != that.success)
+          return false;
+      }
+
+      boolean this_present_aze = true && this.is_set_aze();
+      boolean that_present_aze = true && that.is_set_aze();
+      if (this_present_aze || that_present_aze) {
+        if (!(this_present_aze && that_present_aze))
+          return false;
+        if (!this.aze.equals(that.aze))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int hashCode = 1;
+
+      hashCode = hashCode * 8191 + ((success) ? 131071 : 524287);
+
+      hashCode = hashCode * 8191 + ((is_set_aze()) ? 131071 : 524287);
+      if (is_set_aze())
+        hashCode = hashCode * 8191 + aze.hashCode();
+
+      return hashCode;
+    }
+
+    @Override
+    public int compareTo(isRemoteBlobExists_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = 
java.lang.Boolean.valueOf(is_set_success()).compareTo(other.is_set_success());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (is_set_success()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, 
other.success);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = 
java.lang.Boolean.valueOf(is_set_aze()).compareTo(other.is_set_aze());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (is_set_aze()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.aze, 
other.aze);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws 
org.apache.thrift.TException {
+      scheme(iprot).read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws 
org.apache.thrift.TException {
+      scheme(oprot).write(oprot, this);
+      }
+
+    @Override
+    public java.lang.String toString() {
+      java.lang.StringBuilder sb = new 
java.lang.StringBuilder("isRemoteBlobExists_result(");
+      boolean first = true;
+
+      sb.append("success:");
+      sb.append(this.success);
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("aze:");
+      if (this.aze == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.aze);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, java.lang.ClassNotFoundException {
+      try {
+        // it doesn't seem like you should have to do this, but java 
serialization is wacky, and doesn't call the default constructor.
+        __isset_bitfield = 0;
+        read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class isRemoteBlobExists_resultStandardSchemeFactory 
implements org.apache.thrift.scheme.SchemeFactory {
+      public isRemoteBlobExists_resultStandardScheme getScheme() {
+        return new isRemoteBlobExists_resultStandardScheme();
+      }
+    }
+
+    private static class isRemoteBlobExists_resultStandardScheme extends 
org.apache.thrift.scheme.StandardScheme<isRemoteBlobExists_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, 
isRemoteBlobExists_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 0: // SUCCESS
+              if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+                struct.success = iprot.readBool();
+                struct.set_success_isSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+              }
+              break;
+            case 1: // AZE
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) 
{
+                struct.aze = new AuthorizationException();
+                struct.aze.read(iprot);
+                struct.set_aze_isSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, 
isRemoteBlobExists_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.is_set_success()) {
+          oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+          oprot.writeBool(struct.success);
+          oprot.writeFieldEnd();
+        }
+        if (struct.aze != null) {
+          oprot.writeFieldBegin(AZE_FIELD_DESC);
+          struct.aze.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class isRemoteBlobExists_resultTupleSchemeFactory 
implements org.apache.thrift.scheme.SchemeFactory {
+      public isRemoteBlobExists_resultTupleScheme getScheme() {
+        return new isRemoteBlobExists_resultTupleScheme();
+      }
+    }
+
+    private static class isRemoteBlobExists_resultTupleScheme extends 
org.apache.thrift.scheme.TupleScheme<isRemoteBlobExists_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, 
isRemoteBlobExists_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol oprot = 
(org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet optionals = new java.util.BitSet();
+        if (struct.is_set_success()) {
+          optionals.set(0);
+        }
+        if (struct.is_set_aze()) {
+          optionals.set(1);
+        }
+        oprot.writeBitSet(optionals, 2);
+        if (struct.is_set_success()) {
+          oprot.writeBool(struct.success);
+        }
+        if (struct.is_set_aze()) {
+          struct.aze.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, 
isRemoteBlobExists_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TTupleProtocol iprot = 
(org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet incoming = iprot.readBitSet(2);
+        if (incoming.get(0)) {
+          struct.success = iprot.readBool();
+          struct.set_success_isSet(true);
+        }
+        if (incoming.get(1)) {
+          struct.aze = new AuthorizationException();
+          struct.aze.read(iprot);
+          struct.set_aze_isSet(true);
+        }
+      }
+    }
+
+    private static <S extends org.apache.thrift.scheme.IScheme> S 
scheme(org.apache.thrift.protocol.TProtocol proto) {
+      return 
(org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? 
STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/py/storm/Nimbus-remote
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/Nimbus-remote 
b/storm-client/src/py/storm/Nimbus-remote
index 280c42b..3c35d1d 100644
--- a/storm-client/src/py/storm/Nimbus-remote
+++ b/storm-client/src/py/storm/Nimbus-remote
@@ -92,6 +92,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
     print('  void sendSupervisorWorkerHeartbeats(SupervisorWorkerHeartbeats 
heartbeats)')
     print('  void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat 
heatbeat)')
     print('  void processWorkerMetrics(WorkerMetrics metrics)')
+    print('  bool isRemoteBlobExists(string blobKey)')
     print('')
     sys.exit(0)
 
@@ -471,6 +472,12 @@ elif cmd == 'processWorkerMetrics':
         sys.exit(1)
     pp.pprint(client.processWorkerMetrics(eval(args[0]),))
 
+elif cmd == 'isRemoteBlobExists':
+    if len(args) != 1:
+        print('isRemoteBlobExists requires 1 args')
+        sys.exit(1)
+    pp.pprint(client.isRemoteBlobExists(args[0],))
+
 else:
     print('Unrecognized method %s' % cmd)
     sys.exit(1)

http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/Nimbus.py 
b/storm-client/src/py/storm/Nimbus.py
index ad16141..182dbe2 100644
--- a/storm-client/src/py/storm/Nimbus.py
+++ b/storm-client/src/py/storm/Nimbus.py
@@ -417,6 +417,15 @@ class Iface(object):
         """
         pass
 
+    def isRemoteBlobExists(self, blobKey):
+        """
+        Decide if the blob is removed from cluster.
+
+        Parameters:
+         - blobKey
+        """
+        pass
+
 
 class Client(Iface):
     def __init__(self, iprot, oprot=None):
@@ -2133,6 +2142,41 @@ class Client(Iface):
         iprot.readMessageEnd()
         return
 
+    def isRemoteBlobExists(self, blobKey):
+        """
+        Decide if the blob is removed from cluster.
+
+        Parameters:
+         - blobKey
+        """
+        self.send_isRemoteBlobExists(blobKey)
+        return self.recv_isRemoteBlobExists()
+
+    def send_isRemoteBlobExists(self, blobKey):
+        self._oprot.writeMessageBegin('isRemoteBlobExists', TMessageType.CALL, 
self._seqid)
+        args = isRemoteBlobExists_args()
+        args.blobKey = blobKey
+        args.write(self._oprot)
+        self._oprot.writeMessageEnd()
+        self._oprot.trans.flush()
+
+    def recv_isRemoteBlobExists(self):
+        iprot = self._iprot
+        (fname, mtype, rseqid) = iprot.readMessageBegin()
+        if mtype == TMessageType.EXCEPTION:
+            x = TApplicationException()
+            x.read(iprot)
+            iprot.readMessageEnd()
+            raise x
+        result = isRemoteBlobExists_result()
+        result.read(iprot)
+        iprot.readMessageEnd()
+        if result.success is not None:
+            return result.success
+        if result.aze is not None:
+            raise result.aze
+        raise TApplicationException(TApplicationException.MISSING_RESULT, 
"isRemoteBlobExists failed: unknown result")
+
 
 class Processor(Iface, TProcessor):
     def __init__(self, handler):
@@ -2188,6 +2232,7 @@ class Processor(Iface, TProcessor):
         self._processMap["sendSupervisorWorkerHeartbeats"] = 
Processor.process_sendSupervisorWorkerHeartbeats
         self._processMap["sendSupervisorWorkerHeartbeat"] = 
Processor.process_sendSupervisorWorkerHeartbeat
         self._processMap["processWorkerMetrics"] = 
Processor.process_processWorkerMetrics
+        self._processMap["isRemoteBlobExists"] = 
Processor.process_isRemoteBlobExists
 
     def process(self, iprot, oprot):
         (name, type, seqid) = iprot.readMessageBegin()
@@ -3573,6 +3618,32 @@ class Processor(Iface, TProcessor):
         oprot.writeMessageEnd()
         oprot.trans.flush()
 
+    def process_isRemoteBlobExists(self, seqid, iprot, oprot):
+        args = isRemoteBlobExists_args()
+        args.read(iprot)
+        iprot.readMessageEnd()
+        result = isRemoteBlobExists_result()
+        try:
+            result.success = self._handler.isRemoteBlobExists(args.blobKey)
+            msg_type = TMessageType.REPLY
+        except TTransport.TTransportException:
+            raise
+        except AuthorizationException as aze:
+            msg_type = TMessageType.REPLY
+            result.aze = aze
+        except TApplicationException as ex:
+            logging.exception('TApplication exception in handler')
+            msg_type = TMessageType.EXCEPTION
+            result = ex
+        except Exception:
+            logging.exception('Unexpected exception in handler')
+            msg_type = TMessageType.EXCEPTION
+            result = 
TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+        oprot.writeMessageBegin("isRemoteBlobExists", msg_type, seqid)
+        result.write(oprot)
+        oprot.writeMessageEnd()
+        oprot.trans.flush()
+
 # HELPER FUNCTIONS AND STRUCTURES
 
 
@@ -10642,6 +10713,140 @@ class processWorkerMetrics_result(object):
 all_structs.append(processWorkerMetrics_result)
 processWorkerMetrics_result.thrift_spec = (
 )
+
+
+class isRemoteBlobExists_args(object):
+    """
+    Attributes:
+     - blobKey
+    """
+
+
+    def __init__(self, blobKey=None,):
+        self.blobKey = blobKey
+
+    def read(self, iprot):
+        if iprot._fast_decode is not None and isinstance(iprot.trans, 
TTransport.CReadableTransport) and self.thrift_spec is not None:
+            iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec])
+            return
+        iprot.readStructBegin()
+        while True:
+            (fname, ftype, fid) = iprot.readFieldBegin()
+            if ftype == TType.STOP:
+                break
+            if fid == 1:
+                if ftype == TType.STRING:
+                    self.blobKey = iprot.readString().decode('utf-8') if 
sys.version_info[0] == 2 else iprot.readString()
+                else:
+                    iprot.skip(ftype)
+            else:
+                iprot.skip(ftype)
+            iprot.readFieldEnd()
+        iprot.readStructEnd()
+
+    def write(self, oprot):
+        if oprot._fast_encode is not None and self.thrift_spec is not None:
+            oprot.trans.write(oprot._fast_encode(self, [self.__class__, 
self.thrift_spec]))
+            return
+        oprot.writeStructBegin('isRemoteBlobExists_args')
+        if self.blobKey is not None:
+            oprot.writeFieldBegin('blobKey', TType.STRING, 1)
+            oprot.writeString(self.blobKey.encode('utf-8') if 
sys.version_info[0] == 2 else self.blobKey)
+            oprot.writeFieldEnd()
+        oprot.writeFieldStop()
+        oprot.writeStructEnd()
+
+    def validate(self):
+        return
+
+    def __repr__(self):
+        L = ['%s=%r' % (key, value)
+             for key, value in self.__dict__.items()]
+        return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+    def __eq__(self, other):
+        return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+    def __ne__(self, other):
+        return not (self == other)
+all_structs.append(isRemoteBlobExists_args)
+isRemoteBlobExists_args.thrift_spec = (
+    None,  # 0
+    (1, TType.STRING, 'blobKey', 'UTF8', None, ),  # 1
+)
+
+
+class isRemoteBlobExists_result(object):
+    """
+    Attributes:
+     - success
+     - aze
+    """
+
+
+    def __init__(self, success=None, aze=None,):
+        self.success = success
+        self.aze = aze
+
+    def read(self, iprot):
+        if iprot._fast_decode is not None and isinstance(iprot.trans, 
TTransport.CReadableTransport) and self.thrift_spec is not None:
+            iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec])
+            return
+        iprot.readStructBegin()
+        while True:
+            (fname, ftype, fid) = iprot.readFieldBegin()
+            if ftype == TType.STOP:
+                break
+            if fid == 0:
+                if ftype == TType.BOOL:
+                    self.success = iprot.readBool()
+                else:
+                    iprot.skip(ftype)
+            elif fid == 1:
+                if ftype == TType.STRUCT:
+                    self.aze = AuthorizationException()
+                    self.aze.read(iprot)
+                else:
+                    iprot.skip(ftype)
+            else:
+                iprot.skip(ftype)
+            iprot.readFieldEnd()
+        iprot.readStructEnd()
+
+    def write(self, oprot):
+        if oprot._fast_encode is not None and self.thrift_spec is not None:
+            oprot.trans.write(oprot._fast_encode(self, [self.__class__, 
self.thrift_spec]))
+            return
+        oprot.writeStructBegin('isRemoteBlobExists_result')
+        if self.success is not None:
+            oprot.writeFieldBegin('success', TType.BOOL, 0)
+            oprot.writeBool(self.success)
+            oprot.writeFieldEnd()
+        if self.aze is not None:
+            oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+            self.aze.write(oprot)
+            oprot.writeFieldEnd()
+        oprot.writeFieldStop()
+        oprot.writeStructEnd()
+
+    def validate(self):
+        return
+
+    def __repr__(self):
+        L = ['%s=%r' % (key, value)
+             for key, value in self.__dict__.items()]
+        return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+    def __eq__(self, other):
+        return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+    def __ne__(self, other):
+        return not (self == other)
+all_structs.append(isRemoteBlobExists_result)
+isRemoteBlobExists_result.thrift_spec = (
+    (0, TType.BOOL, 'success', None, None, ),  # 0
+    (1, TType.STRUCT, 'aze', [AuthorizationException, None], None, ),  # 1
+)
 fix_spec(all_structs)
 del all_structs
 

http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift
index e0935bd..fefc6db 100644
--- a/storm-client/src/storm.thrift
+++ b/storm-client/src/storm.thrift
@@ -797,6 +797,10 @@ service Nimbus {
    */
   void sendSupervisorWorkerHeartbeat(1: SupervisorWorkerHeartbeat heatbeat) 
throws (1: AuthorizationException aze, 2: NotAliveException e);
   void processWorkerMetrics(1: WorkerMetrics metrics);
+  /**
+   * Decide if the blob is removed from cluster.
+   */
+  bool isRemoteBlobExists(1: string blobKey) throws (1: AuthorizationException 
aze);
 }
 
 struct DRPCRequest {

http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java
----------------------------------------------------------------------
diff --git 
a/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java 
b/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java
index b1a9b0f..081cc7b 100644
--- a/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java
+++ b/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java
@@ -138,6 +138,11 @@ public class ClientBlobStoreTest {
         }
 
         @Override
+        public boolean isRemoteBlobExists(String blobKey) throws 
AuthorizationException {
+            return allBlobs.containsKey(blobKey);
+        }
+
+        @Override
         protected void setBlobMetaToExtend(String key, SettableBlobMeta meta) 
throws AuthorizationException, KeyNotFoundException {
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java 
b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 5f7d064..634ca35 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -483,6 +483,11 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
     }
 
     @Override
+    public boolean isRemoteBlobExists(String blobKey) throws 
AuthorizationException, TException {
+        throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE");
+    }
+
+    @Override
     public synchronized void close() throws Exception {
         if (nimbusOverride != null) {
             nimbusOverride.close();

http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index be4360e..0c27268 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -4472,6 +4472,16 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
     }
 
+    @Override
+    public boolean isRemoteBlobExists(String blobKey) throws 
AuthorizationException, TException {
+        try {
+            blobStore.getBlobMeta(blobKey, getSubject());
+        } catch (KeyNotFoundException e) {
+            return false;
+        }
+        return true;
+    }
+
     private static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> 
{
         private final K key;
         private final V value;

http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
index 15eba02..fe5246e 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
@@ -88,17 +88,17 @@ public class LocalizedResourceRetentionSet {
             Map.Entry<LocallyCachedBlob, Map<String, ? extends 
LocallyCachedBlob>> rsrc = i.next();
             LocallyCachedBlob resource = rsrc.getKey();
             try {
-                resource.getRemoteVersion(store);
+                if (!store.isRemoteBlobExists(resource.getKey())) {
+                    //The key was removed so we should delete it too.
+                    Map<String, ? extends LocallyCachedBlob> set = 
rsrc.getValue();
+                    if (removeBlob(resource, set)) {
+                        bytesOver -= resource.getSizeOnDisk();
+                        LOG.info("Deleted blob: {} (REMOVED FROM CLUSTER).", 
resource.getKey());
+                        i.remove();
+                    }
+                }
             } catch (AuthorizationException e) {
                 //Ignored
-            } catch (KeyNotFoundException e) {
-                //The key was removed so we should delete it too.
-                Map<String, ? extends LocallyCachedBlob> set = rsrc.getValue();
-                if (removeBlob(resource, set)) {
-                    bytesOver -= resource.getSizeOnDisk();
-                    LOG.info("Deleted blob: {} (KEY NOT FOUND).", 
resource.getKey());
-                    i.remove();
-                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java 
b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index e0a8275..938301f 100644
--- 
a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -597,6 +597,7 @@ public class AsyncLocalizerTest {
             ReadableBlobMeta rbm = new ReadableBlobMeta();
             rbm.set_settable(new SettableBlobMeta(WORLD_EVERYTHING));
             when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
+            
when(mockblobstore.isRemoteBlobExists(anyString())).thenReturn(true);
             when(mockblobstore.getBlob(key1)).thenReturn(new 
TestInputStreamWithMeta(0));
             when(mockblobstore.getBlob(key2)).thenReturn(new 
TestInputStreamWithMeta(0));
             when(mockblobstore.getBlob(key3)).thenReturn(new 
TestInputStreamWithMeta(0));

Reply via email to