This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 75274ab Fixed wait that was causing consistent dead compaction false positives. 75274ab is described below commit 75274ab29b5399788a8a61e4973acde3defa74bf Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon Apr 5 17:27:47 2021 -0400 Fixed wait that was causing consistent dead compaction false positives. --- .../accumulo/core/compaction/thrift/Compactor.java | 1118 ++++++++++++++++++++ core/src/main/thrift/compaction-coordinator.thrift | 8 + .../server/compaction/ExternalCompactionUtil.java | 52 +- .../coordinator/CompactionCoordinator.java | 5 +- .../accumulo/coordinator/CompactionFinalizer.java | 12 +- .../coordinator/DeadCompactionDetector.java | 67 +- .../org/apache/accumulo/compactor/Compactor.java | 49 +- .../tserver/compactions/CompactionManager.java | 4 + 8 files changed, 1236 insertions(+), 79 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/Compactor.java b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/Compactor.java index 94f8dea..6520e71 100644 --- a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/Compactor.java +++ b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/Compactor.java @@ -33,6 +33,8 @@ public class Compactor { public org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob getRunningCompaction(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException, org.apache.thrift.TException; + public java.lang.String getRunningCompactionId(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException, org.apache.thrift.TException; + } public interface AsyncIface { @@ -41,6 +43,8 @@ public class Compactor { public void getRunningCompaction(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob> resultHandler) throws org.apache.thrift.TException; + public void getRunningCompactionId(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<java.lang.String> resultHandler) throws org.apache.thrift.TException; + } public static class Client extends org.apache.thrift.TServiceClient implements Iface { @@ -115,6 +119,33 @@ public class Compactor { throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getRunningCompaction failed: unknown result"); } + public java.lang.String getRunningCompactionId(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException, org.apache.thrift.TException + { + send_getRunningCompactionId(tinfo, credentials); + return recv_getRunningCompactionId(); + } + + public void send_getRunningCompactionId(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws org.apache.thrift.TException + { + getRunningCompactionId_args args = new getRunningCompactionId_args(); + args.setTinfo(tinfo); + args.setCredentials(credentials); + sendBase("getRunningCompactionId", args); + } + + public java.lang.String recv_getRunningCompactionId() throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException, org.apache.thrift.TException + { + getRunningCompactionId_result result = new getRunningCompactionId_result(); + receiveBase(result, "getRunningCompactionId"); + if (result.isSetSuccess()) { + return result.success; + } + if (result.sec != null) { + throw result.sec; + } + throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getRunningCompactionId failed: unknown result"); + } + } public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface { public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> { @@ -206,6 +237,41 @@ public class Compactor { } } + public void getRunningCompactionId(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<java.lang.String> resultHandler) throws org.apache.thrift.TException { + checkReady(); + getRunningCompactionId_call method_call = new getRunningCompactionId_call(tinfo, credentials, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class getRunningCompactionId_call extends org.apache.thrift.async.TAsyncMethodCall<java.lang.String> { + private org.apache.accumulo.core.trace.thrift.TInfo tinfo; + private org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials; + public getRunningCompactionId_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<java.lang.String> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + super(client, protocolFactory, transport, resultHandler, false); + this.tinfo = tinfo; + this.credentials = credentials; + } + + public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { + prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("getRunningCompactionId", org.apache.thrift.protocol.TMessageType.CALL, 0)); + getRunningCompactionId_args args = new getRunningCompactionId_args(); + args.setTinfo(tinfo); + args.setCredentials(credentials); + args.write(prot); + prot.writeMessageEnd(); + } + + public java.lang.String getResult() throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException, org.apache.thrift.TException { + if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new java.lang.IllegalStateException("Method call not finished!"); + } + org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + return (new Client(prot)).recv_getRunningCompactionId(); + } + } + } public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor { @@ -221,6 +287,7 @@ public class Compactor { private static <I extends Iface> java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { processMap.put("cancel", new cancel()); processMap.put("getRunningCompaction", new getRunningCompaction()); + processMap.put("getRunningCompactionId", new getRunningCompactionId()); return processMap; } @@ -282,6 +349,35 @@ public class Compactor { } } + public static class getRunningCompactionId<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getRunningCompactionId_args> { + public getRunningCompactionId() { + super("getRunningCompactionId"); + } + + public getRunningCompactionId_args getEmptyArgsInstance() { + return new getRunningCompactionId_args(); + } + + protected boolean isOneway() { + return false; + } + + @Override + protected boolean rethrowUnhandledExceptions() { + return false; + } + + public getRunningCompactionId_result getResult(I iface, getRunningCompactionId_args args) throws org.apache.thrift.TException { + getRunningCompactionId_result result = new getRunningCompactionId_result(); + try { + result.success = iface.getRunningCompactionId(args.tinfo, args.credentials); + } catch (org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException sec) { + result.sec = sec; + } + return result; + } + } + } public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> { @@ -297,6 +393,7 @@ public class Compactor { private static <I extends AsyncIface> java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase,?>> getProcessMap(java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) { processMap.put("cancel", new cancel()); processMap.put("getRunningCompaction", new getRunningCompaction()); + processMap.put("getRunningCompactionId", new getRunningCompactionId()); return processMap; } @@ -429,6 +526,71 @@ public class Compactor { } } + public static class getRunningCompactionId<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getRunningCompactionId_args, java.lang.String> { + public getRunningCompactionId() { + super("getRunningCompactionId"); + } + + public getRunningCompactionId_args getEmptyArgsInstance() { + return new getRunningCompactionId_args(); + } + + public org.apache.thrift.async.AsyncMethodCallback<java.lang.String> getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { + final org.apache.thrift.AsyncProcessFunction fcall = this; + return new org.apache.thrift.async.AsyncMethodCallback<java.lang.String>() { + public void onComplete(java.lang.String o) { + getRunningCompactionId_result result = new getRunningCompactionId_result(); + result.success = o; + try { + fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid); + } catch (org.apache.thrift.transport.TTransportException e) { + _LOGGER.error("TTransportException writing to internal frame buffer", e); + fb.close(); + } catch (java.lang.Exception e) { + _LOGGER.error("Exception writing to internal frame buffer", e); + onError(e); + } + } + public void onError(java.lang.Exception e) { + byte msgType = org.apache.thrift.protocol.TMessageType.REPLY; + org.apache.thrift.TSerializable msg; + getRunningCompactionId_result result = new getRunningCompactionId_result(); + if (e instanceof org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException) { + result.sec = (org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException) e; + result.setSecIsSet(true); + msg = result; + } else if (e instanceof org.apache.thrift.transport.TTransportException) { + _LOGGER.error("TTransportException inside handler", e); + fb.close(); + return; + } else if (e instanceof org.apache.thrift.TApplicationException) { + _LOGGER.error("TApplicationException inside handler", e); + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = (org.apache.thrift.TApplicationException)e; + } else { + _LOGGER.error("Exception inside handler", e); + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage()); + } + try { + fcall.sendResponse(fb,msg,msgType,seqid); + } catch (java.lang.Exception ex) { + _LOGGER.error("Exception writing to internal frame buffer", ex); + fb.close(); + } + } + }; + } + + protected boolean isOneway() { + return false; + } + + public void start(I iface, getRunningCompactionId_args args, org.apache.thrift.async.AsyncMethodCallback<java.lang.String> resultHandler) throws org.apache.thrift.TException { + iface.getRunningCompactionId(args.tinfo, args.credentials,resultHandler); + } + } + } public static class cancel_args implements org.apache.thrift.TBase<cancel_args, cancel_args._Fields>, java.io.Serializable, Cloneable, Comparable<cancel_args> { @@ -2348,5 +2510,961 @@ public class Compactor { } } + public static class getRunningCompactionId_args implements org.apache.thrift.TBase<getRunningCompactionId_args, getRunningCompactionId_args._Fields>, java.io.Serializable, Cloneable, Comparable<getRunningCompactionId_args> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getRunningCompactionId_args"); + + private static final org.apache.thrift.protocol.TField TINFO_FIELD_DESC = new org.apache.thrift.protocol.TField("tinfo", org.apache.thrift.protocol.TType.STRUCT, (short)1); + private static final org.apache.thrift.protocol.TField CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", org.apache.thrift.protocol.TType.STRUCT, (short)2); + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new getRunningCompactionId_argsStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new getRunningCompactionId_argsTupleSchemeFactory(); + + public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.trace.thrift.TInfo tinfo; // required + public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + TINFO((short)1, "tinfo"), + CREDENTIALS((short)2, "credentials"); + + private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // TINFO + return TINFO; + case 2: // CREDENTIALS + return CREDENTIALS; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.TINFO, new org.apache.thrift.meta_data.FieldMetaData("tinfo", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.trace.thrift.TInfo.class))); + tmpMap.put(_Fields.CREDENTIALS, new org.apache.thrift.meta_data.FieldMetaData("credentials", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.securityImpl.thrift.TCredentials.class))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getRunningCompactionId_args.class, metaDataMap); + } + + public getRunningCompactionId_args() { + } + + public getRunningCompactionId_args( + org.apache.accumulo.core.trace.thrift.TInfo tinfo, + org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) + { + this(); + this.tinfo = tinfo; + this.credentials = credentials; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public getRunningCompactionId_args(getRunningCompactionId_args other) { + if (other.isSetTinfo()) { + this.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo(other.tinfo); + } + if (other.isSetCredentials()) { + this.credentials = new org.apache.accumulo.core.securityImpl.thrift.TCredentials(other.credentials); + } + } + + public getRunningCompactionId_args deepCopy() { + return new getRunningCompactionId_args(this); + } + + @Override + public void clear() { + this.tinfo = null; + this.credentials = null; + } + + @org.apache.thrift.annotation.Nullable + public org.apache.accumulo.core.trace.thrift.TInfo getTinfo() { + return this.tinfo; + } + + public getRunningCompactionId_args setTinfo(@org.apache.thrift.annotation.Nullable org.apache.accumulo.core.trace.thrift.TInfo tinfo) { + this.tinfo = tinfo; + return this; + } + + public void unsetTinfo() { + this.tinfo = null; + } + + /** Returns true if field tinfo is set (has been assigned a value) and false otherwise */ + public boolean isSetTinfo() { + return this.tinfo != null; + } + + public void setTinfoIsSet(boolean value) { + if (!value) { + this.tinfo = null; + } + } + + @org.apache.thrift.annotation.Nullable + public org.apache.accumulo.core.securityImpl.thrift.TCredentials getCredentials() { + return this.credentials; + } + + public getRunningCompactionId_args setCredentials(@org.apache.thrift.annotation.Nullable org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) { + this.credentials = credentials; + return this; + } + + public void unsetCredentials() { + this.credentials = null; + } + + /** Returns true if field credentials is set (has been assigned a value) and false otherwise */ + public boolean isSetCredentials() { + return this.credentials != null; + } + + public void setCredentialsIsSet(boolean value) { + if (!value) { + this.credentials = null; + } + } + + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { + switch (field) { + case TINFO: + if (value == null) { + unsetTinfo(); + } else { + setTinfo((org.apache.accumulo.core.trace.thrift.TInfo)value); + } + break; + + case CREDENTIALS: + if (value == null) { + unsetCredentials(); + } else { + setCredentials((org.apache.accumulo.core.securityImpl.thrift.TCredentials)value); + } + break; + + } + } + + @org.apache.thrift.annotation.Nullable + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case TINFO: + return getTinfo(); + + case CREDENTIALS: + return getCredentials(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case TINFO: + return isSetTinfo(); + case CREDENTIALS: + return isSetCredentials(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof getRunningCompactionId_args) + return this.equals((getRunningCompactionId_args)that); + return false; + } + + public boolean equals(getRunningCompactionId_args that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_tinfo = true && this.isSetTinfo(); + boolean that_present_tinfo = true && that.isSetTinfo(); + if (this_present_tinfo || that_present_tinfo) { + if (!(this_present_tinfo && that_present_tinfo)) + return false; + if (!this.tinfo.equals(that.tinfo)) + return false; + } + + boolean this_present_credentials = true && this.isSetCredentials(); + boolean that_present_credentials = true && that.isSetCredentials(); + if (this_present_credentials || that_present_credentials) { + if (!(this_present_credentials && that_present_credentials)) + return false; + if (!this.credentials.equals(that.credentials)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetTinfo()) ? 131071 : 524287); + if (isSetTinfo()) + hashCode = hashCode * 8191 + tinfo.hashCode(); + + hashCode = hashCode * 8191 + ((isSetCredentials()) ? 131071 : 524287); + if (isSetCredentials()) + hashCode = hashCode * 8191 + credentials.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(getRunningCompactionId_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(isSetTinfo()).compareTo(other.isSetTinfo()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTinfo()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tinfo, other.tinfo); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = java.lang.Boolean.valueOf(isSetCredentials()).compareTo(other.isSetCredentials()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCredentials()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentials, other.credentials); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + @org.apache.thrift.annotation.Nullable + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("getRunningCompactionId_args("); + boolean first = true; + + sb.append("tinfo:"); + if (this.tinfo == null) { + sb.append("null"); + } else { + sb.append(this.tinfo); + } + first = false; + if (!first) sb.append(", "); + sb.append("credentials:"); + if (this.credentials == null) { + sb.append("null"); + } else { + sb.append(this.credentials); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + if (tinfo != null) { + tinfo.validate(); + } + if (credentials != null) { + credentials.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class getRunningCompactionId_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public getRunningCompactionId_argsStandardScheme getScheme() { + return new getRunningCompactionId_argsStandardScheme(); + } + } + + private static class getRunningCompactionId_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme<getRunningCompactionId_args> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, getRunningCompactionId_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // TINFO + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo(); + struct.tinfo.read(iprot); + struct.setTinfoIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // CREDENTIALS + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.credentials = new org.apache.accumulo.core.securityImpl.thrift.TCredentials(); + struct.credentials.read(iprot); + struct.setCredentialsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, getRunningCompactionId_args struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.tinfo != null) { + oprot.writeFieldBegin(TINFO_FIELD_DESC); + struct.tinfo.write(oprot); + oprot.writeFieldEnd(); + } + if (struct.credentials != null) { + oprot.writeFieldBegin(CREDENTIALS_FIELD_DESC); + struct.credentials.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class getRunningCompactionId_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public getRunningCompactionId_argsTupleScheme getScheme() { + return new getRunningCompactionId_argsTupleScheme(); + } + } + + private static class getRunningCompactionId_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme<getRunningCompactionId_args> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, getRunningCompactionId_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetTinfo()) { + optionals.set(0); + } + if (struct.isSetCredentials()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetTinfo()) { + struct.tinfo.write(oprot); + } + if (struct.isSetCredentials()) { + struct.credentials.write(oprot); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, getRunningCompactionId_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo(); + struct.tinfo.read(iprot); + struct.setTinfoIsSet(true); + } + if (incoming.get(1)) { + struct.credentials = new org.apache.accumulo.core.securityImpl.thrift.TCredentials(); + struct.credentials.read(iprot); + struct.setCredentialsIsSet(true); + } + } + } + + private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + } + + public static class getRunningCompactionId_result implements org.apache.thrift.TBase<getRunningCompactionId_result, getRunningCompactionId_result._Fields>, java.io.Serializable, Cloneable, Comparable<getRunningCompactionId_result> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getRunningCompactionId_result"); + + private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRING, (short)0); + private static final org.apache.thrift.protocol.TField SEC_FIELD_DESC = new org.apache.thrift.protocol.TField("sec", org.apache.thrift.protocol.TType.STRUCT, (short)1); + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new getRunningCompactionId_resultStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new getRunningCompactionId_resultTupleSchemeFactory(); + + public @org.apache.thrift.annotation.Nullable java.lang.String success; // required + public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException sec; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + SUCCESS((short)0, "success"), + SEC((short)1, "sec"); + + private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 0: // SUCCESS + return SUCCESS; + case 1: // SEC + return SEC; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.SEC, new org.apache.thrift.meta_data.FieldMetaData("sec", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException.class))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getRunningCompactionId_result.class, metaDataMap); + } + + public getRunningCompactionId_result() { + } + + public getRunningCompactionId_result( + java.lang.String success, + org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException sec) + { + this(); + this.success = success; + this.sec = sec; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public getRunningCompactionId_result(getRunningCompactionId_result other) { + if (other.isSetSuccess()) { + this.success = other.success; + } + if (other.isSetSec()) { + this.sec = new org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException(other.sec); + } + } + + public getRunningCompactionId_result deepCopy() { + return new getRunningCompactionId_result(this); + } + + @Override + public void clear() { + this.success = null; + this.sec = null; + } + + @org.apache.thrift.annotation.Nullable + public java.lang.String getSuccess() { + return this.success; + } + + public getRunningCompactionId_result setSuccess(@org.apache.thrift.annotation.Nullable java.lang.String success) { + this.success = success; + return this; + } + + public void unsetSuccess() { + this.success = null; + } + + /** Returns true if field success is set (has been assigned a value) and false otherwise */ + public boolean isSetSuccess() { + return this.success != null; + } + + public void setSuccessIsSet(boolean value) { + if (!value) { + this.success = null; + } + } + + @org.apache.thrift.annotation.Nullable + public org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException getSec() { + return this.sec; + } + + public getRunningCompactionId_result setSec(@org.apache.thrift.annotation.Nullable org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException sec) { + this.sec = sec; + return this; + } + + public void unsetSec() { + this.sec = null; + } + + /** Returns true if field sec is set (has been assigned a value) and false otherwise */ + public boolean isSetSec() { + return this.sec != null; + } + + public void setSecIsSet(boolean value) { + if (!value) { + this.sec = null; + } + } + + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { + switch (field) { + case SUCCESS: + if (value == null) { + unsetSuccess(); + } else { + setSuccess((java.lang.String)value); + } + break; + + case SEC: + if (value == null) { + unsetSec(); + } else { + setSec((org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException)value); + } + break; + + } + } + + @org.apache.thrift.annotation.Nullable + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case SUCCESS: + return getSuccess(); + + case SEC: + return getSec(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case SUCCESS: + return isSetSuccess(); + case SEC: + return isSetSec(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof getRunningCompactionId_result) + return this.equals((getRunningCompactionId_result)that); + return false; + } + + public boolean equals(getRunningCompactionId_result that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_success = true && this.isSetSuccess(); + boolean that_present_success = true && that.isSetSuccess(); + if (this_present_success || that_present_success) { + if (!(this_present_success && that_present_success)) + return false; + if (!this.success.equals(that.success)) + return false; + } + + boolean this_present_sec = true && this.isSetSec(); + boolean that_present_sec = true && that.isSetSec(); + if (this_present_sec || that_present_sec) { + if (!(this_present_sec && that_present_sec)) + return false; + if (!this.sec.equals(that.sec)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetSuccess()) ? 131071 : 524287); + if (isSetSuccess()) + hashCode = hashCode * 8191 + success.hashCode(); + + hashCode = hashCode * 8191 + ((isSetSec()) ? 131071 : 524287); + if (isSetSec()) + hashCode = hashCode * 8191 + sec.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(getRunningCompactionId_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSuccess()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = java.lang.Boolean.valueOf(isSetSec()).compareTo(other.isSetSec()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSec()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.sec, other.sec); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + @org.apache.thrift.annotation.Nullable + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("getRunningCompactionId_result("); + boolean first = true; + + sb.append("success:"); + if (this.success == null) { + sb.append("null"); + } else { + sb.append(this.success); + } + first = false; + if (!first) sb.append(", "); + sb.append("sec:"); + if (this.sec == null) { + sb.append("null"); + } else { + sb.append(this.sec); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class getRunningCompactionId_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public getRunningCompactionId_resultStandardScheme getScheme() { + return new getRunningCompactionId_resultStandardScheme(); + } + } + + private static class getRunningCompactionId_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme<getRunningCompactionId_result> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, getRunningCompactionId_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 0: // SUCCESS + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.success = iprot.readString(); + struct.setSuccessIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 1: // SEC + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.sec = new org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException(); + struct.sec.read(iprot); + struct.setSecIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, getRunningCompactionId_result struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.success != null) { + oprot.writeFieldBegin(SUCCESS_FIELD_DESC); + oprot.writeString(struct.success); + oprot.writeFieldEnd(); + } + if (struct.sec != null) { + oprot.writeFieldBegin(SEC_FIELD_DESC); + struct.sec.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class getRunningCompactionId_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public getRunningCompactionId_resultTupleScheme getScheme() { + return new getRunningCompactionId_resultTupleScheme(); + } + } + + private static class getRunningCompactionId_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme<getRunningCompactionId_result> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, getRunningCompactionId_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetSuccess()) { + optionals.set(0); + } + if (struct.isSetSec()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetSuccess()) { + oprot.writeString(struct.success); + } + if (struct.isSetSec()) { + struct.sec.write(oprot); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, getRunningCompactionId_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.success = iprot.readString(); + struct.setSuccessIsSet(true); + } + if (incoming.get(1)) { + struct.sec = new org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException(); + struct.sec.read(iprot); + struct.setSecIsSet(true); + } + } + } + + private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + } + private static void unusedMethod() {} } diff --git a/core/src/main/thrift/compaction-coordinator.thrift b/core/src/main/thrift/compaction-coordinator.thrift index 9a5fc15..1a43391 100644 --- a/core/src/main/thrift/compaction-coordinator.thrift +++ b/core/src/main/thrift/compaction-coordinator.thrift @@ -145,4 +145,12 @@ service Compactor { ) throws ( 1:client.ThriftSecurityException sec ) + + string getRunningCompactionId( + 1:trace.TInfo tinfo + 2:security.TCredentials credentials + ) throws ( + 1:client.ThriftSecurityException sec + ) + } diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java index 6ec94ac..f5b48ab 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java @@ -19,7 +19,9 @@ package org.apache.accumulo.server.compaction; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -28,6 +30,7 @@ import java.util.concurrent.Future; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.compaction.thrift.Compactor; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; @@ -119,7 +122,7 @@ public class ExternalCompactionUtil { /** * Get the compaction currently running on the Compactor - * + * * @param compactorAddr * compactor address * @param context @@ -147,6 +150,24 @@ public class ExternalCompactionUtil { return null; } + private static ExternalCompactionId getRunningCompactionId(HostAndPort compactorAddr, + ServerContext context) { + Compactor.Client client = null; + try { + // CBUG should this retry? + client = ThriftUtil.getClient(new Compactor.Client.Factory(), compactorAddr, context); + String secid = client.getRunningCompactionId(TraceUtil.traceInfo(), context.rpcCreds()); + if (!secid.isEmpty()) { + return ExternalCompactionId.of(secid); + } + } catch (TException e) { + LOG.debug("Failed to contact compactor {}", compactorAddr, e); + } finally { + ThriftUtil.returnClient(client); + } + return null; + } + /** * @param context * server context @@ -181,4 +202,33 @@ public class ExternalCompactionUtil { return results; } + public static Collection<ExternalCompactionId> + getCompactionIdsRunningOnCompactors(ServerContext context) { + final ExecutorService executor = + ThreadPools.createFixedThreadPool(16, "CompactorRunningCompactions", false); + + List<Future<ExternalCompactionId>> futures = new ArrayList<>(); + + getCompactorAddrs(context).forEach(hp -> { + futures.add(executor.submit(() -> getRunningCompactionId(hp, context))); + }); + executor.shutdown(); + + HashSet<ExternalCompactionId> runningIds = new HashSet<>(); + + futures.forEach(future -> { + try { + ExternalCompactionId ceid = future.get(); + if (ceid != null) { + runningIds.add(ceid); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + }); + + return runningIds; + } } diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index fcb6461..d32f0b1 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -22,7 +22,6 @@ import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; @@ -61,7 +60,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.fate.zookeeper.ZooLock; @@ -646,8 +644,7 @@ public class CompactionCoordinator extends AbstractServer if (null != rc) { // CBUG: Should we remove rc from RUNNING here and remove the isCompactionCompleted method? rc.setCompleted(); - compactionFinalizer.failCompactions(Collections - .singleton(new Pair<ExternalCompactionId,KeyExtent>(ecid, KeyExtent.fromThrift(extent)))); + compactionFinalizer.failCompactions(Map.of(ecid, KeyExtent.fromThrift(extent))); } else { LOG.error( "Compaction failed called by Compactor for {}, but no running compaction for that id.", diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java index d41240b..2b968ba 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java @@ -21,7 +21,7 @@ package org.apache.accumulo.coordinator; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; @@ -40,7 +40,6 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.server.ServerContext; @@ -93,12 +92,11 @@ public class CompactionFinalizer { pendingNotifications.offer(ecfs); } - public void - failCompactions(Set<? extends Pair<ExternalCompactionId,KeyExtent>> compactionsToFail) { + public void failCompactions(Map<ExternalCompactionId,KeyExtent> compactionsToFail) { - var finalStates = - compactionsToFail.stream().map(ctf -> new ExternalCompactionFinalState(ctf.getFirst(), - ctf.getSecond(), FinalState.FAILED, 0L, 0L)).collect(Collectors.toList()); + var finalStates = compactionsToFail.entrySet().stream().map( + e -> new ExternalCompactionFinalState(e.getKey(), e.getValue(), FinalState.FAILED, 0L, 0L)) + .collect(Collectors.toList()); context.getAmple().putExternalCompactionFinalStates(finalStates); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java index 3c52a8e..ed4419a 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java @@ -18,18 +18,14 @@ */ package org.apache.accumulo.coordinator; +import java.util.Collection; +import java.util.HashMap; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; -import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; -import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.server.ServerContext; @@ -44,16 +40,6 @@ public class DeadCompactionDetector { private final ServerContext context; private final CompactionFinalizer finalizer; - private static class CompIdExtent extends Pair<ExternalCompactionId,KeyExtent> { - CompIdExtent(ExternalCompactionId ecid, KeyExtent extent) { - super(ecid, extent); - } - - public CompIdExtent(ExternalCompactionFinalState ecfs) { - super(ecfs.getExternalCompactionId(), ecfs.getExtent()); - } - } - public DeadCompactionDetector(ServerContext context, CompactionFinalizer finalizer) { this.context = context; this.finalizer = finalizer; @@ -65,50 +51,45 @@ public class DeadCompactionDetector { log.trace("Starting to look for dead compactions"); + Map<ExternalCompactionId,KeyExtent> tabletCompactions = new HashMap<>(); + // find what external compactions tablets think are running - Set<CompIdExtent> tabletCompactions = context.getAmple().readTablets().forLevel(DataLevel.USER) - .fetch(ColumnType.ECOMP, ColumnType.PREV_ROW).build().stream() - .flatMap(tm -> tm.getExternalCompactions().keySet().stream() - .map(ecid -> new CompIdExtent(ecid, tm.getExtent()))) - .collect(Collectors.toSet()); + context.getAmple().readTablets().forLevel(DataLevel.USER) + .fetch(ColumnType.ECOMP, ColumnType.PREV_ROW).build().forEach(tm -> { + tm.getExternalCompactions().keySet().forEach(ecid -> { + tabletCompactions.put(ecid, tm.getExtent()); + }); + }); if (tabletCompactions.isEmpty()) { // no need to look for dead compactions when tablets don't have anything recorded as running return; } - if(log.isTraceEnabled()) { - tabletCompactions.forEach(cie -> log.trace("Saw {} for {}", cie.getFirst(), cie.getSecond())); + if (log.isTraceEnabled()) { + tabletCompactions.forEach((ecid, extent) -> log.trace("Saw {} for {}", ecid, extent)); } // Determine what compactions are currently running and remove those. // // In order for this overall algorithm to be correct and avoid race conditions, the compactor - // must do two things when its asked what it is currently running. - // - // 1. If it is in the process of reserving a compaction, then it should wait for the - // reservation to complete before responding to this request. - // 2. If it is in the process of committing a compaction, then it should respond that it is - // running the compaction it is currently committing. - // - // If the two conditions above are not met, then legitimate running compactions could be - // canceled. - Map<HostAndPort,TExternalCompactionJob> running = - ExternalCompactionUtil.getCompactionsRunningOnCompactors(context); - running.forEach((k, v) -> { - CompIdExtent cie = new CompIdExtent(ExternalCompactionId.of(v.getExternalCompactionId()), - KeyExtent.fromThrift(v.getExtent())); - if(tabletCompactions.remove(cie)) { - log.trace("Removed {} running on a compactor", cie.getFirst()); + // must return ids covering the time period from before reservation until after commit. If the + // ids do not cover this time period then legitimate running compactions could be canceled. + Collection<ExternalCompactionId> running = + ExternalCompactionUtil.getCompactionIdsRunningOnCompactors(context); + + running.forEach((ecid) -> { + if (tabletCompactions.remove(ecid) != null) { + log.trace("Removed {} running on a compactor", ecid); } }); // Determine which compactions are currently committing and remove those - context.getAmple().getExternalCompactionFinalStates().map(CompIdExtent::new) - .forEach(tabletCompactions::remove); + context.getAmple().getExternalCompactionFinalStates() + .map(ecfs -> ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove); - tabletCompactions.forEach( - cid -> log.debug("Detected dead compaction {} {}", cid.getFirst(), cid.getSecond())); + tabletCompactions + .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", ecid, extent)); // Everything left in tabletCompactions is no longer running anywhere and should be failed. // Its possible that a compaction committed while going through the steps above, if so then diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 8968e65..5bc0656 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -64,7 +64,6 @@ import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; -import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; @@ -703,35 +702,37 @@ public class Compactor extends AbstractServer SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } - // CBUG need to ensure the following are met... also add these props as comments stating they - // must be met - // 1. This method will block if a compaction is currently being reserved until the reservation - // is complete - // 2. This method will report it is running a compaction while it is in the process of - // committing that compaction. + // Return what is currently running, does not wait for jobs in the process of reserving. This + // method is called by a coordinator starting up to determine what is currently running on all + // compactors. - ExternalCompactionId eci = currentCompactionId.get(); - if (null == eci) { + TExternalCompactionJob job = null; + synchronized (JOB_HOLDER) { + job = JOB_HOLDER.getJob(); + } + + if (null == job) { return new TExternalCompactionJob(); } else { - // Then we are trying to reserve an external compaction, but have not yet - // started it. Wait until it's set to return the job - TExternalCompactionJob job = null; - synchronized (JOB_HOLDER) { - job = JOB_HOLDER.getJob(); - } - while (null == job) { - // CBUG: It's possible that the call from the coordinator could - // be stuck here waiting for a compaction to be reserved and stall the - // DeadCompactionDetector from contacting other compactors. - UtilWaitThread.sleep(50); - synchronized (JOB_HOLDER) { - job = JOB_HOLDER.getJob(); - } - } return job; } } + @Override + public String getRunningCompactionId(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException, TException { + + // Any returned id must cover the time period from before a job is reserved until after it + // commits. This method is called to detect dead compactions and depends on this behavior. + // For the purpose of detecting dead compactions its ok if ids are returned that never end up + // being related to a running compaction. + ExternalCompactionId eci = currentCompactionId.get(); + if (null == eci) { + return ""; + } else { + return eci.canonical(); + } + } + } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index 5584b61..e4a5596 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -455,6 +455,8 @@ public class CompactionManager { compactablesToCheck.add(tablet.asCompactable()); } runningExternalCompactions.remove(extCompactionId); + } else { + ctx.getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId)); } } @@ -473,6 +475,8 @@ public class CompactionManager { compactablesToCheck.add(tablet.asCompactable()); } runningExternalCompactions.remove(ecid); + } else { + ctx.getAmple().deleteExternalCompactionFinalStates(List.of(ecid)); } }