Repository: storm Updated Branches: refs/heads/master 9861f9a71 -> 64af629a1
[STORM-3036] Add isRemoteBlobExists RPC interface for deciding if remote blob exists. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/26c4bec9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/26c4bec9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/26c4bec9 Branch: refs/heads/master Commit: 26c4bec99efec800b3824db67ef88f5d856354a7 Parents: 9e97a07 Author: chenyuzhao <[email protected]> Authored: Fri May 25 00:34:14 2018 +0800 Committer: chenyuzhao <[email protected]> Committed: Fri May 25 00:34:14 2018 +0800 ---------------------------------------------------------------------- .../hdfs/blobstore/HdfsClientBlobStore.java | 10 + .../apache/storm/blobstore/ClientBlobStore.java | 6 + .../blobstore/LocalModeClientBlobStore.java | 10 + .../apache/storm/blobstore/NimbusBlobStore.java | 11 + .../jvm/org/apache/storm/generated/Nimbus.java | 986 +++++++++++++++++++ storm-client/src/py/storm/Nimbus-remote | 7 + storm-client/src/py/storm/Nimbus.py | 205 ++++ storm-client/src/storm.thrift | 4 + .../storm/blobstore/ClientBlobStoreTest.java | 5 + .../java/org/apache/storm/LocalCluster.java | 5 + .../org/apache/storm/daemon/nimbus/Nimbus.java | 10 + .../LocalizedResourceRetentionSet.java | 18 +- .../storm/localizer/AsyncLocalizerTest.java | 1 + 13 files changed, 1269 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java index c39904c..419cf997 100644 --- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java +++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java @@ -73,6 +73,16 @@ public class HdfsClientBlobStore extends ClientBlobStore { } @Override + public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException { + try { + _blobStore.getBlob(blobKey, null); + } catch (KeyNotFoundException e) { + return false; + } + return true; + } + + @Override public void setBlobMetaToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException { _blobStore.setBlobMeta(key, meta, null); http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java index 57e8578..f01c74e 100644 --- a/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java +++ b/storm-client/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java @@ -91,6 +91,12 @@ public abstract class ClientBlobStore implements Shutdownable, AutoCloseable { public abstract ReadableBlobMeta getBlobMeta(String key) throws AuthorizationException, KeyNotFoundException; /** + * Decide if the blob is deleted from cluster. + * @param blobKey blob key + */ + public abstract boolean isRemoteBlobExists(String blobKey) throws AuthorizationException; + + /** * Client facing API to set the metadata for a blob. * * @param key blob key name. http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java index 0f8287c..70ed6c2 100644 --- a/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java +++ b/storm-client/src/jvm/org/apache/storm/blobstore/LocalModeClientBlobStore.java @@ -58,6 +58,16 @@ public class LocalModeClientBlobStore extends ClientBlobStore { } @Override + public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException { + try { + wrapped.getBlob(blobKey, null); + } catch (KeyNotFoundException e) { + return false; + } + return true; + } + + @Override protected void setBlobMetaToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException { wrapped.setBlobMeta(key, meta, null); } http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java index 152f1f1..fd58062 100644 --- a/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java +++ b/storm-client/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java @@ -92,6 +92,17 @@ public class NimbusBlobStore extends ClientBlobStore implements AutoCloseable { } @Override + public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException { + try { + return client.getClient().isRemoteBlobExists(blobKey); + } catch (AuthorizationException aze) { + throw aze; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override protected void setBlobMetaToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException { try { http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java b/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java index ac105e1..3955ddc 100644 --- a/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java +++ b/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java @@ -166,6 +166,13 @@ public class Nimbus { public void processWorkerMetrics(WorkerMetrics metrics) throws org.apache.thrift.TException; + /** + * Decide if the blob is removed from cluster. + * + * @param blobKey + */ + public boolean isRemoteBlobExists(java.lang.String blobKey) throws AuthorizationException, org.apache.thrift.TException; + } public interface AsyncIface { @@ -270,6 +277,8 @@ public class Nimbus { public void processWorkerMetrics(WorkerMetrics metrics, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException; + public void isRemoteBlobExists(java.lang.String blobKey, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) throws org.apache.thrift.TException; + } public static class Client extends org.apache.thrift.TServiceClient implements Iface { @@ -1621,6 +1630,32 @@ public class Nimbus { return; } + public boolean isRemoteBlobExists(java.lang.String blobKey) throws AuthorizationException, org.apache.thrift.TException + { + send_isRemoteBlobExists(blobKey); + return recv_isRemoteBlobExists(); + } + + public void send_isRemoteBlobExists(java.lang.String blobKey) throws org.apache.thrift.TException + { + isRemoteBlobExists_args args = new isRemoteBlobExists_args(); + args.set_blobKey(blobKey); + sendBase("isRemoteBlobExists", args); + } + + public boolean recv_isRemoteBlobExists() throws AuthorizationException, org.apache.thrift.TException + { + isRemoteBlobExists_result result = new isRemoteBlobExists_result(); + receiveBase(result, "isRemoteBlobExists"); + if (result.is_set_success()) { + return result.success; + } + if (result.aze != null) { + throw result.aze; + } + throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "isRemoteBlobExists failed: unknown result"); + } + } public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface { public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> { @@ -3317,6 +3352,38 @@ public class Nimbus { } } + public void isRemoteBlobExists(java.lang.String blobKey, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) throws org.apache.thrift.TException { + checkReady(); + isRemoteBlobExists_call method_call = new isRemoteBlobExists_call(blobKey, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class isRemoteBlobExists_call extends org.apache.thrift.async.TAsyncMethodCall<java.lang.Boolean> { + private java.lang.String blobKey; + public isRemoteBlobExists_call(java.lang.String blobKey, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + super(client, protocolFactory, transport, resultHandler, false); + this.blobKey = blobKey; + } + + public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { + prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("isRemoteBlobExists", org.apache.thrift.protocol.TMessageType.CALL, 0)); + isRemoteBlobExists_args args = new isRemoteBlobExists_args(); + args.set_blobKey(blobKey); + args.write(prot); + prot.writeMessageEnd(); + } + + public java.lang.Boolean getResult() throws AuthorizationException, org.apache.thrift.TException { + if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new java.lang.IllegalStateException("Method call not finished!"); + } + org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + return (new Client(prot)).recv_isRemoteBlobExists(); + } + } + } public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor { @@ -3380,6 +3447,7 @@ public class Nimbus { processMap.put("sendSupervisorWorkerHeartbeats", new sendSupervisorWorkerHeartbeats()); processMap.put("sendSupervisorWorkerHeartbeat", new sendSupervisorWorkerHeartbeat()); processMap.put("processWorkerMetrics", new processWorkerMetrics()); + processMap.put("isRemoteBlobExists", new isRemoteBlobExists()); return processMap; } @@ -4868,6 +4936,36 @@ public class Nimbus { } } + public static class isRemoteBlobExists<I extends Iface> extends org.apache.thrift.ProcessFunction<I, isRemoteBlobExists_args> { + public isRemoteBlobExists() { + super("isRemoteBlobExists"); + } + + public isRemoteBlobExists_args getEmptyArgsInstance() { + return new isRemoteBlobExists_args(); + } + + protected boolean isOneway() { + return false; + } + + @Override + protected boolean handleRuntimeExceptions() { + return false; + } + + public isRemoteBlobExists_result getResult(I iface, isRemoteBlobExists_args args) throws org.apache.thrift.TException { + isRemoteBlobExists_result result = new isRemoteBlobExists_result(); + try { + result.success = iface.isRemoteBlobExists(args.blobKey); + result.set_success_isSet(true); + } catch (AuthorizationException aze) { + result.aze = aze; + } + return result; + } + } + } public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> { @@ -4931,6 +5029,7 @@ public class Nimbus { processMap.put("sendSupervisorWorkerHeartbeats", new sendSupervisorWorkerHeartbeats()); processMap.put("sendSupervisorWorkerHeartbeat", new sendSupervisorWorkerHeartbeat()); processMap.put("processWorkerMetrics", new processWorkerMetrics()); + processMap.put("isRemoteBlobExists", new isRemoteBlobExists()); return processMap; } @@ -8257,6 +8356,72 @@ public class Nimbus { } } + public static class isRemoteBlobExists<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, isRemoteBlobExists_args, java.lang.Boolean> { + public isRemoteBlobExists() { + super("isRemoteBlobExists"); + } + + public isRemoteBlobExists_args getEmptyArgsInstance() { + return new isRemoteBlobExists_args(); + } + + public org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { + final org.apache.thrift.AsyncProcessFunction fcall = this; + return new org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean>() { + public void onComplete(java.lang.Boolean o) { + isRemoteBlobExists_result result = new isRemoteBlobExists_result(); + result.success = o; + result.set_success_isSet(true); + try { + fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid); + } catch (org.apache.thrift.transport.TTransportException e) { + _LOGGER.error("TTransportException writing to internal frame buffer", e); + fb.close(); + } catch (java.lang.Exception e) { + _LOGGER.error("Exception writing to internal frame buffer", e); + onError(e); + } + } + public void onError(java.lang.Exception e) { + byte msgType = org.apache.thrift.protocol.TMessageType.REPLY; + org.apache.thrift.TSerializable msg; + isRemoteBlobExists_result result = new isRemoteBlobExists_result(); + if (e instanceof AuthorizationException) { + result.aze = (AuthorizationException) e; + result.set_aze_isSet(true); + msg = result; + } else if (e instanceof org.apache.thrift.transport.TTransportException) { + _LOGGER.error("TTransportException inside handler", e); + fb.close(); + return; + } else if (e instanceof org.apache.thrift.TApplicationException) { + _LOGGER.error("TApplicationException inside handler", e); + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = (org.apache.thrift.TApplicationException)e; + } else { + _LOGGER.error("Exception inside handler", e); + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage()); + } + try { + fcall.sendResponse(fb,msg,msgType,seqid); + } catch (java.lang.Exception ex) { + _LOGGER.error("Exception writing to internal frame buffer", ex); + fb.close(); + } + } + }; + } + + protected boolean isOneway() { + return false; + } + + public void start(I iface, isRemoteBlobExists_args args, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) throws org.apache.thrift.TException { + iface.isRemoteBlobExists(args.blobKey,resultHandler); + } + } + } public static class submitTopology_args implements org.apache.thrift.TBase<submitTopology_args, submitTopology_args._Fields>, java.io.Serializable, Cloneable, Comparable<submitTopology_args> { @@ -52455,4 +52620,825 @@ public class Nimbus { } } + public static class isRemoteBlobExists_args implements org.apache.thrift.TBase<isRemoteBlobExists_args, isRemoteBlobExists_args._Fields>, java.io.Serializable, Cloneable, Comparable<isRemoteBlobExists_args> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("isRemoteBlobExists_args"); + + private static final org.apache.thrift.protocol.TField BLOB_KEY_FIELD_DESC = new org.apache.thrift.protocol.TField("blobKey", org.apache.thrift.protocol.TType.STRING, (short)1); + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new isRemoteBlobExists_argsStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new isRemoteBlobExists_argsTupleSchemeFactory(); + + private java.lang.String blobKey; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + BLOB_KEY((short)1, "blobKey"); + + private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // BLOB_KEY + return BLOB_KEY; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.BLOB_KEY, new org.apache.thrift.meta_data.FieldMetaData("blobKey", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(isRemoteBlobExists_args.class, metaDataMap); + } + + public isRemoteBlobExists_args() { + } + + public isRemoteBlobExists_args( + java.lang.String blobKey) + { + this(); + this.blobKey = blobKey; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public isRemoteBlobExists_args(isRemoteBlobExists_args other) { + if (other.is_set_blobKey()) { + this.blobKey = other.blobKey; + } + } + + public isRemoteBlobExists_args deepCopy() { + return new isRemoteBlobExists_args(this); + } + + @Override + public void clear() { + this.blobKey = null; + } + + public java.lang.String get_blobKey() { + return this.blobKey; + } + + public void set_blobKey(java.lang.String blobKey) { + this.blobKey = blobKey; + } + + public void unset_blobKey() { + this.blobKey = null; + } + + /** Returns true if field blobKey is set (has been assigned a value) and false otherwise */ + public boolean is_set_blobKey() { + return this.blobKey != null; + } + + public void set_blobKey_isSet(boolean value) { + if (!value) { + this.blobKey = null; + } + } + + public void setFieldValue(_Fields field, java.lang.Object value) { + switch (field) { + case BLOB_KEY: + if (value == null) { + unset_blobKey(); + } else { + set_blobKey((java.lang.String)value); + } + break; + + } + } + + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case BLOB_KEY: + return get_blobKey(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case BLOB_KEY: + return is_set_blobKey(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof isRemoteBlobExists_args) + return this.equals((isRemoteBlobExists_args)that); + return false; + } + + public boolean equals(isRemoteBlobExists_args that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_blobKey = true && this.is_set_blobKey(); + boolean that_present_blobKey = true && that.is_set_blobKey(); + if (this_present_blobKey || that_present_blobKey) { + if (!(this_present_blobKey && that_present_blobKey)) + return false; + if (!this.blobKey.equals(that.blobKey)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((is_set_blobKey()) ? 131071 : 524287); + if (is_set_blobKey()) + hashCode = hashCode * 8191 + blobKey.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(isRemoteBlobExists_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(is_set_blobKey()).compareTo(other.is_set_blobKey()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_blobKey()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.blobKey, other.blobKey); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("isRemoteBlobExists_args("); + boolean first = true; + + sb.append("blobKey:"); + if (this.blobKey == null) { + sb.append("null"); + } else { + sb.append(this.blobKey); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class isRemoteBlobExists_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public isRemoteBlobExists_argsStandardScheme getScheme() { + return new isRemoteBlobExists_argsStandardScheme(); + } + } + + private static class isRemoteBlobExists_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme<isRemoteBlobExists_args> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, isRemoteBlobExists_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // BLOB_KEY + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.blobKey = iprot.readString(); + struct.set_blobKey_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, isRemoteBlobExists_args struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.blobKey != null) { + oprot.writeFieldBegin(BLOB_KEY_FIELD_DESC); + oprot.writeString(struct.blobKey); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class isRemoteBlobExists_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public isRemoteBlobExists_argsTupleScheme getScheme() { + return new isRemoteBlobExists_argsTupleScheme(); + } + } + + private static class isRemoteBlobExists_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme<isRemoteBlobExists_args> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, isRemoteBlobExists_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.is_set_blobKey()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.is_set_blobKey()) { + oprot.writeString(struct.blobKey); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, isRemoteBlobExists_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.blobKey = iprot.readString(); + struct.set_blobKey_isSet(true); + } + } + } + + private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + } + + public static class isRemoteBlobExists_result implements org.apache.thrift.TBase<isRemoteBlobExists_result, isRemoteBlobExists_result._Fields>, java.io.Serializable, Cloneable, Comparable<isRemoteBlobExists_result> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("isRemoteBlobExists_result"); + + private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.BOOL, (short)0); + private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.thrift.protocol.TField("aze", org.apache.thrift.protocol.TType.STRUCT, (short)1); + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new isRemoteBlobExists_resultStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new isRemoteBlobExists_resultTupleSchemeFactory(); + + private boolean success; // required + private AuthorizationException aze; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + SUCCESS((short)0, "success"), + AZE((short)1, "aze"); + + private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 0: // SUCCESS + return SUCCESS; + case 1: // AZE + return AZE; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __SUCCESS_ISSET_ID = 0; + private byte __isset_bitfield = 0; + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); + tmpMap.put(_Fields.AZE, new org.apache.thrift.meta_data.FieldMetaData("aze", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, AuthorizationException.class))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(isRemoteBlobExists_result.class, metaDataMap); + } + + public isRemoteBlobExists_result() { + } + + public isRemoteBlobExists_result( + boolean success, + AuthorizationException aze) + { + this(); + this.success = success; + set_success_isSet(true); + this.aze = aze; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public isRemoteBlobExists_result(isRemoteBlobExists_result other) { + __isset_bitfield = other.__isset_bitfield; + this.success = other.success; + if (other.is_set_aze()) { + this.aze = new AuthorizationException(other.aze); + } + } + + public isRemoteBlobExists_result deepCopy() { + return new isRemoteBlobExists_result(this); + } + + @Override + public void clear() { + set_success_isSet(false); + this.success = false; + this.aze = null; + } + + public boolean is_success() { + return this.success; + } + + public void set_success(boolean success) { + this.success = success; + set_success_isSet(true); + } + + public void unset_success() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __SUCCESS_ISSET_ID); + } + + /** Returns true if field success is set (has been assigned a value) and false otherwise */ + public boolean is_set_success() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __SUCCESS_ISSET_ID); + } + + public void set_success_isSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __SUCCESS_ISSET_ID, value); + } + + public AuthorizationException get_aze() { + return this.aze; + } + + public void set_aze(AuthorizationException aze) { + this.aze = aze; + } + + public void unset_aze() { + this.aze = null; + } + + /** Returns true if field aze is set (has been assigned a value) and false otherwise */ + public boolean is_set_aze() { + return this.aze != null; + } + + public void set_aze_isSet(boolean value) { + if (!value) { + this.aze = null; + } + } + + public void setFieldValue(_Fields field, java.lang.Object value) { + switch (field) { + case SUCCESS: + if (value == null) { + unset_success(); + } else { + set_success((java.lang.Boolean)value); + } + break; + + case AZE: + if (value == null) { + unset_aze(); + } else { + set_aze((AuthorizationException)value); + } + break; + + } + } + + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case SUCCESS: + return is_success(); + + case AZE: + return get_aze(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case SUCCESS: + return is_set_success(); + case AZE: + return is_set_aze(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof isRemoteBlobExists_result) + return this.equals((isRemoteBlobExists_result)that); + return false; + } + + public boolean equals(isRemoteBlobExists_result that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_success = true; + boolean that_present_success = true; + if (this_present_success || that_present_success) { + if (!(this_present_success && that_present_success)) + return false; + if (this.success != that.success) + return false; + } + + boolean this_present_aze = true && this.is_set_aze(); + boolean that_present_aze = true && that.is_set_aze(); + if (this_present_aze || that_present_aze) { + if (!(this_present_aze && that_present_aze)) + return false; + if (!this.aze.equals(that.aze)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((success) ? 131071 : 524287); + + hashCode = hashCode * 8191 + ((is_set_aze()) ? 131071 : 524287); + if (is_set_aze()) + hashCode = hashCode * 8191 + aze.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(isRemoteBlobExists_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(is_set_success()).compareTo(other.is_set_success()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_success()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = java.lang.Boolean.valueOf(is_set_aze()).compareTo(other.is_set_aze()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_aze()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.aze, other.aze); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("isRemoteBlobExists_result("); + boolean first = true; + + sb.append("success:"); + sb.append(this.success); + first = false; + if (!first) sb.append(", "); + sb.append("aze:"); + if (this.aze == null) { + sb.append("null"); + } else { + sb.append(this.aze); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class isRemoteBlobExists_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public isRemoteBlobExists_resultStandardScheme getScheme() { + return new isRemoteBlobExists_resultStandardScheme(); + } + } + + private static class isRemoteBlobExists_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme<isRemoteBlobExists_result> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, isRemoteBlobExists_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 0: // SUCCESS + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.success = iprot.readBool(); + struct.set_success_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 1: // AZE + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.aze = new AuthorizationException(); + struct.aze.read(iprot); + struct.set_aze_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, isRemoteBlobExists_result struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.is_set_success()) { + oprot.writeFieldBegin(SUCCESS_FIELD_DESC); + oprot.writeBool(struct.success); + oprot.writeFieldEnd(); + } + if (struct.aze != null) { + oprot.writeFieldBegin(AZE_FIELD_DESC); + struct.aze.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class isRemoteBlobExists_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public isRemoteBlobExists_resultTupleScheme getScheme() { + return new isRemoteBlobExists_resultTupleScheme(); + } + } + + private static class isRemoteBlobExists_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme<isRemoteBlobExists_result> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, isRemoteBlobExists_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.is_set_success()) { + optionals.set(0); + } + if (struct.is_set_aze()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.is_set_success()) { + oprot.writeBool(struct.success); + } + if (struct.is_set_aze()) { + struct.aze.write(oprot); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, isRemoteBlobExists_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.success = iprot.readBool(); + struct.set_success_isSet(true); + } + if (incoming.get(1)) { + struct.aze = new AuthorizationException(); + struct.aze.read(iprot); + struct.set_aze_isSet(true); + } + } + } + + private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/py/storm/Nimbus-remote ---------------------------------------------------------------------- diff --git a/storm-client/src/py/storm/Nimbus-remote b/storm-client/src/py/storm/Nimbus-remote index 280c42b..3c35d1d 100644 --- a/storm-client/src/py/storm/Nimbus-remote +++ b/storm-client/src/py/storm/Nimbus-remote @@ -92,6 +92,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help': print(' void sendSupervisorWorkerHeartbeats(SupervisorWorkerHeartbeats heartbeats)') print(' void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heatbeat)') print(' void processWorkerMetrics(WorkerMetrics metrics)') + print(' bool isRemoteBlobExists(string blobKey)') print('') sys.exit(0) @@ -471,6 +472,12 @@ elif cmd == 'processWorkerMetrics': sys.exit(1) pp.pprint(client.processWorkerMetrics(eval(args[0]),)) +elif cmd == 'isRemoteBlobExists': + if len(args) != 1: + print('isRemoteBlobExists requires 1 args') + sys.exit(1) + pp.pprint(client.isRemoteBlobExists(args[0],)) + else: print('Unrecognized method %s' % cmd) sys.exit(1) http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/py/storm/Nimbus.py ---------------------------------------------------------------------- diff --git a/storm-client/src/py/storm/Nimbus.py b/storm-client/src/py/storm/Nimbus.py index ad16141..182dbe2 100644 --- a/storm-client/src/py/storm/Nimbus.py +++ b/storm-client/src/py/storm/Nimbus.py @@ -417,6 +417,15 @@ class Iface(object): """ pass + def isRemoteBlobExists(self, blobKey): + """ + Decide if the blob is removed from cluster. + + Parameters: + - blobKey + """ + pass + class Client(Iface): def __init__(self, iprot, oprot=None): @@ -2133,6 +2142,41 @@ class Client(Iface): iprot.readMessageEnd() return + def isRemoteBlobExists(self, blobKey): + """ + Decide if the blob is removed from cluster. + + Parameters: + - blobKey + """ + self.send_isRemoteBlobExists(blobKey) + return self.recv_isRemoteBlobExists() + + def send_isRemoteBlobExists(self, blobKey): + self._oprot.writeMessageBegin('isRemoteBlobExists', TMessageType.CALL, self._seqid) + args = isRemoteBlobExists_args() + args.blobKey = blobKey + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_isRemoteBlobExists(self): + iprot = self._iprot + (fname, mtype, rseqid) = iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(iprot) + iprot.readMessageEnd() + raise x + result = isRemoteBlobExists_result() + result.read(iprot) + iprot.readMessageEnd() + if result.success is not None: + return result.success + if result.aze is not None: + raise result.aze + raise TApplicationException(TApplicationException.MISSING_RESULT, "isRemoteBlobExists failed: unknown result") + class Processor(Iface, TProcessor): def __init__(self, handler): @@ -2188,6 +2232,7 @@ class Processor(Iface, TProcessor): self._processMap["sendSupervisorWorkerHeartbeats"] = Processor.process_sendSupervisorWorkerHeartbeats self._processMap["sendSupervisorWorkerHeartbeat"] = Processor.process_sendSupervisorWorkerHeartbeat self._processMap["processWorkerMetrics"] = Processor.process_processWorkerMetrics + self._processMap["isRemoteBlobExists"] = Processor.process_isRemoteBlobExists def process(self, iprot, oprot): (name, type, seqid) = iprot.readMessageBegin() @@ -3573,6 +3618,32 @@ class Processor(Iface, TProcessor): oprot.writeMessageEnd() oprot.trans.flush() + def process_isRemoteBlobExists(self, seqid, iprot, oprot): + args = isRemoteBlobExists_args() + args.read(iprot) + iprot.readMessageEnd() + result = isRemoteBlobExists_result() + try: + result.success = self._handler.isRemoteBlobExists(args.blobKey) + msg_type = TMessageType.REPLY + except TTransport.TTransportException: + raise + except AuthorizationException as aze: + msg_type = TMessageType.REPLY + result.aze = aze + except TApplicationException as ex: + logging.exception('TApplication exception in handler') + msg_type = TMessageType.EXCEPTION + result = ex + except Exception: + logging.exception('Unexpected exception in handler') + msg_type = TMessageType.EXCEPTION + result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error') + oprot.writeMessageBegin("isRemoteBlobExists", msg_type, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + # HELPER FUNCTIONS AND STRUCTURES @@ -10642,6 +10713,140 @@ class processWorkerMetrics_result(object): all_structs.append(processWorkerMetrics_result) processWorkerMetrics_result.thrift_spec = ( ) + + +class isRemoteBlobExists_args(object): + """ + Attributes: + - blobKey + """ + + + def __init__(self, blobKey=None,): + self.blobKey = blobKey + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec]) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.blobKey = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec])) + return + oprot.writeStructBegin('isRemoteBlobExists_args') + if self.blobKey is not None: + oprot.writeFieldBegin('blobKey', TType.STRING, 1) + oprot.writeString(self.blobKey.encode('utf-8') if sys.version_info[0] == 2 else self.blobKey) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) +all_structs.append(isRemoteBlobExists_args) +isRemoteBlobExists_args.thrift_spec = ( + None, # 0 + (1, TType.STRING, 'blobKey', 'UTF8', None, ), # 1 +) + + +class isRemoteBlobExists_result(object): + """ + Attributes: + - success + - aze + """ + + + def __init__(self, success=None, aze=None,): + self.success = success + self.aze = aze + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec]) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 0: + if ftype == TType.BOOL: + self.success = iprot.readBool() + else: + iprot.skip(ftype) + elif fid == 1: + if ftype == TType.STRUCT: + self.aze = AuthorizationException() + self.aze.read(iprot) + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec])) + return + oprot.writeStructBegin('isRemoteBlobExists_result') + if self.success is not None: + oprot.writeFieldBegin('success', TType.BOOL, 0) + oprot.writeBool(self.success) + oprot.writeFieldEnd() + if self.aze is not None: + oprot.writeFieldBegin('aze', TType.STRUCT, 1) + self.aze.write(oprot) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) +all_structs.append(isRemoteBlobExists_result) +isRemoteBlobExists_result.thrift_spec = ( + (0, TType.BOOL, 'success', None, None, ), # 0 + (1, TType.STRUCT, 'aze', [AuthorizationException, None], None, ), # 1 +) fix_spec(all_structs) del all_structs http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/src/storm.thrift ---------------------------------------------------------------------- diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift index e0935bd..fefc6db 100644 --- a/storm-client/src/storm.thrift +++ b/storm-client/src/storm.thrift @@ -797,6 +797,10 @@ service Nimbus { */ void sendSupervisorWorkerHeartbeat(1: SupervisorWorkerHeartbeat heatbeat) throws (1: AuthorizationException aze, 2: NotAliveException e); void processWorkerMetrics(1: WorkerMetrics metrics); + /** + * Decide if the blob is removed from cluster. + */ + bool isRemoteBlobExists(1: string blobKey) throws (1: AuthorizationException aze); } struct DRPCRequest { http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java ---------------------------------------------------------------------- diff --git a/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java b/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java index b1a9b0f..081cc7b 100644 --- a/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java +++ b/storm-client/test/jvm/org/apache/storm/blobstore/ClientBlobStoreTest.java @@ -138,6 +138,11 @@ public class ClientBlobStoreTest { } @Override + public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException { + return allBlobs.containsKey(blobKey); + } + + @Override protected void setBlobMetaToExtend(String key, SettableBlobMeta meta) throws AuthorizationException, KeyNotFoundException { } http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-server/src/main/java/org/apache/storm/LocalCluster.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java index 5f7d064..634ca35 100644 --- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java +++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java @@ -483,6 +483,11 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface { } @Override + public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException, TException { + throw new KeyNotFoundException("BLOBS NOT SUPPORTED IN LOCAL MODE"); + } + + @Override public synchronized void close() throws Exception { if (nimbusOverride != null) { nimbusOverride.close(); http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index be4360e..0c27268 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -4472,6 +4472,16 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } } + @Override + public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException, TException { + try { + blobStore.getBlobMeta(blobKey, getSubject()); + } catch (KeyNotFoundException e) { + return false; + } + return true; + } + private static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> { private final K key; private final V value; http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java index 15eba02..fe5246e 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java @@ -88,17 +88,17 @@ public class LocalizedResourceRetentionSet { Map.Entry<LocallyCachedBlob, Map<String, ? extends LocallyCachedBlob>> rsrc = i.next(); LocallyCachedBlob resource = rsrc.getKey(); try { - resource.getRemoteVersion(store); + if (!store.isRemoteBlobExists(resource.getKey())) { + //The key was removed so we should delete it too. + Map<String, ? extends LocallyCachedBlob> set = rsrc.getValue(); + if (removeBlob(resource, set)) { + bytesOver -= resource.getSizeOnDisk(); + LOG.info("Deleted blob: {} (REMOVED FROM CLUSTER).", resource.getKey()); + i.remove(); + } + } } catch (AuthorizationException e) { //Ignored - } catch (KeyNotFoundException e) { - //The key was removed so we should delete it too. - Map<String, ? extends LocallyCachedBlob> set = rsrc.getValue(); - if (removeBlob(resource, set)) { - bytesOver -= resource.getSizeOnDisk(); - LOG.info("Deleted blob: {} (KEY NOT FOUND).", resource.getKey()); - i.remove(); - } } } http://git-wip-us.apache.org/repos/asf/storm/blob/26c4bec9/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java index e0a8275..938301f 100644 --- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java +++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java @@ -597,6 +597,7 @@ public class AsyncLocalizerTest { ReadableBlobMeta rbm = new ReadableBlobMeta(); rbm.set_settable(new SettableBlobMeta(WORLD_EVERYTHING)); when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm); + when(mockblobstore.isRemoteBlobExists(anyString())).thenReturn(true); when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(0)); when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta(0)); when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta(0));
