Repository: incubator-tephra Updated Branches: refs/heads/master 39f4fde63 -> 8532076f8
(TEPHRA-250) Allow to trigger transaction pruning This closes #52 Signed-off-by: anew <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/8532076f Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/8532076f Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/8532076f Branch: refs/heads/master Commit: 8532076f802508a4a761408a7501c82463dd3ad5 Parents: 39f4fde Author: anew <[email protected]> Authored: Fri Sep 8 16:36:29 2017 -0700 Committer: anew <[email protected]> Committed: Fri Sep 8 22:52:36 2017 -0700 ---------------------------------------------------------------------- .../apache/tephra/TransactionSystemClient.java | 5 + .../tephra/distributed/TransactionService.java | 28 +- .../distributed/TransactionServiceClient.java | 18 + .../TransactionServiceThriftClient.java | 9 + .../TransactionServiceThriftHandler.java | 12 +- .../distributed/thrift/TTransactionServer.java | 565 +++++++++++++++++++ .../tephra/inmemory/DetachedTxSystemClient.java | 5 + .../tephra/inmemory/InMemoryTxSystemClient.java | 6 + .../tephra/inmemory/MinimalTxSystemClient.java | 5 + .../txprune/TransactionPruningService.java | 19 +- tephra-core/src/main/thrift/transaction.thrift | 1 + .../tephra/ThriftTransactionSystemTest.java | 35 ++ .../txprune/TransactionPruningServiceTest.java | 35 +- 13 files changed, 730 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java index fe4b63e..9702c61 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java @@ -140,4 +140,9 @@ public interface TransactionSystemClient { * @return the size of invalid list */ int getInvalidSize(); + + /** + * Trigger transaction pruning now. + */ + void pruneNow(); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java index f95e5b3..b3a9ae7 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java @@ -48,9 +48,9 @@ import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; /** - * + * Transaction Service that includes Transaction Manager, Thrift Server, and Pruning Service. */ -public final class TransactionService extends InMemoryTransactionService { +public class TransactionService extends InMemoryTransactionService { private static final Logger LOG = LoggerFactory.getLogger(TransactionService.class); private LeaderElection leaderElection; private final Configuration conf; @@ -97,7 +97,7 @@ public final class TransactionService extends InMemoryTransactionService { } }, MoreExecutors.sameThreadExecutor()); - pruningService = new TransactionPruningService(conf, txManager); + pruningService = createPruningService(conf, txManager); server = ThriftRPCServer.builder(TTransactionServer.class) .setHost(address) @@ -105,7 +105,7 @@ public final class TransactionService extends InMemoryTransactionService { .setWorkerThreads(threads) .setMaxReadBufferBytes(maxReadBufferBytes) .setIOThreads(ioThreads) - .build(new TransactionServiceThriftHandler(txManager)); + .build(new TransactionServiceThriftHandler(txManager, pruningService)); try { server.startAndWait(); pruningService.startAndWait(); @@ -142,6 +142,14 @@ public final class TransactionService extends InMemoryTransactionService { notifyStarted(); } + /** + * Called at startup to create the pruning service. + */ + @VisibleForTesting + protected TransactionPruningService createPruningService(Configuration conf, TransactionManager txManager) { + return new TransactionPruningService(conf, txManager); + } + @VisibleForTesting State thriftRPCServerState() { return server.state(); @@ -179,4 +187,16 @@ public final class TransactionService extends InMemoryTransactionService { public TransactionManager getTransactionManager() { return txManager; } + + /** + * This allows systems that embed the transaction service access to the pruning service, + * so that they can trigger prunning (rather than waiting for its scheduled run time). + * + * @return null if pruning is not enabled + */ + @SuppressWarnings({"WeakerAccess", "unused"}) + @Nullable + public TransactionPruningService getTransactionPruningService() { + return pruningService; + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java index 5f7792a..cdcca7f 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java @@ -488,4 +488,22 @@ public class TransactionServiceClient implements TransactionSystemClient { throw Throwables.propagate(e); } } + + @Override + public void pruneNow() { + try { + this.execute( + new Operation<Void>("pruneNow") { + @Override + public Void execute(TransactionServiceThriftClient client) + throws TException { + client.pruneNow(); + return null; + } + }); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java index 8ba81e3..ccd266a 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java @@ -302,6 +302,15 @@ public class TransactionServiceThriftClient { } } + public void pruneNow() throws TException { + try { + client.pruneNow(); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + public boolean isValid() { return isValid.get(); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java index 954ee1d..174b463 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java @@ -31,6 +31,7 @@ import org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotExce import org.apache.tephra.distributed.thrift.TTransactionNotInProgressException; import org.apache.tephra.distributed.thrift.TTransactionServer; import org.apache.tephra.rpc.RPCServiceHandler; +import org.apache.tephra.txprune.TransactionPruningService; import org.apache.thrift.TException; import java.io.ByteArrayOutputStream; @@ -57,9 +58,11 @@ import java.util.Set; public class TransactionServiceThriftHandler implements TTransactionServer.Iface, RPCServiceHandler { private final TransactionManager txManager; + private final TransactionPruningService pruningService; - public TransactionServiceThriftHandler(TransactionManager txManager) { + public TransactionServiceThriftHandler(TransactionManager txManager, TransactionPruningService pruningService) { this.txManager = txManager; + this.pruningService = pruningService; } @Override @@ -206,7 +209,12 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface } } - /* RPCServiceHandler implementation */ + @Override + public void pruneNow() throws TException { + pruningService.pruneNow(); + } + +/* RPCServiceHandler implementation */ @Override public void init() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java index 634350d..6c99bb4 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java @@ -88,6 +88,8 @@ public class TTransactionServer { public TTransaction checkpoint(TTransaction tx) throws TTransactionNotInProgressException, org.apache.thrift.TException; + public void pruneNow() throws org.apache.thrift.TException; + } public interface AsyncIface { @@ -128,6 +130,8 @@ public class TTransactionServer { public void checkpoint(TTransaction tx, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.checkpoint_call> resultHandler) throws org.apache.thrift.TException; + public void pruneNow(org.apache.thrift.async.AsyncMethodCallback<AsyncClient.pruneNow_call> resultHandler) throws org.apache.thrift.TException; + } public static class Client extends org.apache.thrift.TServiceClient implements Iface { @@ -581,6 +585,25 @@ public class TTransactionServer { throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "checkpoint failed: unknown result"); } + public void pruneNow() throws org.apache.thrift.TException + { + send_pruneNow(); + recv_pruneNow(); + } + + public void send_pruneNow() throws org.apache.thrift.TException + { + pruneNow_args args = new pruneNow_args(); + sendBase("pruneNow", args); + } + + public void recv_pruneNow() throws org.apache.thrift.TException + { + pruneNow_result result = new pruneNow_result(); + receiveBase(result, "pruneNow"); + return; + } + } public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface { public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> { @@ -1163,6 +1186,35 @@ public class TTransactionServer { } } + public void pruneNow(org.apache.thrift.async.AsyncMethodCallback<pruneNow_call> resultHandler) throws org.apache.thrift.TException { + checkReady(); + pruneNow_call method_call = new pruneNow_call(resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class pruneNow_call extends org.apache.thrift.async.TAsyncMethodCall { + public pruneNow_call(org.apache.thrift.async.AsyncMethodCallback<pruneNow_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + super(client, protocolFactory, transport, resultHandler, false); + } + + public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { + prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("pruneNow", org.apache.thrift.protocol.TMessageType.CALL, 0)); + pruneNow_args args = new pruneNow_args(); + args.write(prot); + prot.writeMessageEnd(); + } + + public void getResult() throws org.apache.thrift.TException { + if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new IllegalStateException("Method call not finished!"); + } + org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + (new Client(prot)).recv_pruneNow(); + } + } + } public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor { @@ -1194,6 +1246,7 @@ public class TTransactionServer { processMap.put("truncateInvalidTxBefore", new truncateInvalidTxBefore()); processMap.put("invalidTxSize", new invalidTxSize()); processMap.put("checkpoint", new checkpoint()); + processMap.put("pruneNow", new pruneNow()); return processMap; } @@ -1595,6 +1648,26 @@ public class TTransactionServer { } } + public static class pruneNow<I extends Iface> extends org.apache.thrift.ProcessFunction<I, pruneNow_args> { + public pruneNow() { + super("pruneNow"); + } + + public pruneNow_args getEmptyArgsInstance() { + return new pruneNow_args(); + } + + protected boolean isOneway() { + return false; + } + + public pruneNow_result getResult(I iface, pruneNow_args args) throws org.apache.thrift.TException { + pruneNow_result result = new pruneNow_result(); + iface.pruneNow(); + return result; + } + } + } public static class startLong_args implements org.apache.thrift.TBase<startLong_args, startLong_args._Fields>, java.io.Serializable, Cloneable { @@ -14782,4 +14855,496 @@ public class TTransactionServer { } + public static class pruneNow_args implements org.apache.thrift.TBase<pruneNow_args, pruneNow_args._Fields>, java.io.Serializable, Cloneable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("pruneNow_args"); + + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new pruneNow_argsStandardSchemeFactory()); + schemes.put(TupleScheme.class, new pruneNow_argsTupleSchemeFactory()); + } + + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { +; + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(pruneNow_args.class, metaDataMap); + } + + public pruneNow_args() { + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public pruneNow_args(pruneNow_args other) { + } + + public pruneNow_args deepCopy() { + return new pruneNow_args(this); + } + + @Override + public void clear() { + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof pruneNow_args) + return this.equals((pruneNow_args)that); + return false; + } + + public boolean equals(pruneNow_args that) { + if (that == null) + return false; + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(pruneNow_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + pruneNow_args typedOther = (pruneNow_args)other; + + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("pruneNow_args("); + boolean first = true; + + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class pruneNow_argsStandardSchemeFactory implements SchemeFactory { + public pruneNow_argsStandardScheme getScheme() { + return new pruneNow_argsStandardScheme(); + } + } + + private static class pruneNow_argsStandardScheme extends StandardScheme<pruneNow_args> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, pruneNow_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, pruneNow_args struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class pruneNow_argsTupleSchemeFactory implements SchemeFactory { + public pruneNow_argsTupleScheme getScheme() { + return new pruneNow_argsTupleScheme(); + } + } + + private static class pruneNow_argsTupleScheme extends TupleScheme<pruneNow_args> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, pruneNow_args struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, pruneNow_args struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + } + } + + } + + public static class pruneNow_result implements org.apache.thrift.TBase<pruneNow_result, pruneNow_result._Fields>, java.io.Serializable, Cloneable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("pruneNow_result"); + + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new pruneNow_resultStandardSchemeFactory()); + schemes.put(TupleScheme.class, new pruneNow_resultTupleSchemeFactory()); + } + + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { +; + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(pruneNow_result.class, metaDataMap); + } + + public pruneNow_result() { + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public pruneNow_result(pruneNow_result other) { + } + + public pruneNow_result deepCopy() { + return new pruneNow_result(this); + } + + @Override + public void clear() { + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof pruneNow_result) + return this.equals((pruneNow_result)that); + return false; + } + + public boolean equals(pruneNow_result that) { + if (that == null) + return false; + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(pruneNow_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + pruneNow_result typedOther = (pruneNow_result)other; + + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("pruneNow_result("); + boolean first = true; + + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class pruneNow_resultStandardSchemeFactory implements SchemeFactory { + public pruneNow_resultStandardScheme getScheme() { + return new pruneNow_resultStandardScheme(); + } + } + + private static class pruneNow_resultStandardScheme extends StandardScheme<pruneNow_result> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, pruneNow_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, pruneNow_result struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class pruneNow_resultTupleSchemeFactory implements SchemeFactory { + public pruneNow_resultTupleScheme getScheme() { + return new pruneNow_resultTupleScheme(); + } + } + + private static class pruneNow_resultTupleScheme extends TupleScheme<pruneNow_result> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, pruneNow_result struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, pruneNow_result struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + } + } + + } + } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java index c8bf22a..0a8ed96 100644 --- a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java @@ -140,4 +140,9 @@ public class DetachedTxSystemClient implements TransactionSystemClient { public int getInvalidSize() { return 0; } + + @Override + public void pruneNow() { + // do nothing + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java index da38dd2..9719bcc 100644 --- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java @@ -133,4 +133,10 @@ public class InMemoryTxSystemClient implements TransactionSystemClient { public int getInvalidSize() { return txManager.getInvalidSize(); } + + + @Override + public void pruneNow() { + // no-op: no pruning in-memory + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java index 2f60225..b54e57f 100644 --- a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java +++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java @@ -109,4 +109,9 @@ public class MinimalTxSystemClient implements TransactionSystemClient { public int getInvalidSize() { return 0; } + + @Override + public void pruneNow() { + // do nothing + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java index 8d7fe2f..ae22372 100644 --- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java +++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java @@ -50,6 +50,7 @@ public class TransactionPruningService extends AbstractIdleService { private final TransactionManager txManager; private final long scheduleInterval; private final boolean pruneEnabled; + private TransactionPruningRunnable pruneRunnable; private ScheduledExecutorService scheduledExecutorService; public TransactionPruningService(Configuration conf, TransactionManager txManager) { @@ -81,9 +82,8 @@ public class TransactionPruningService extends AbstractIdleService { long txPruneBufferMillis = TimeUnit.SECONDS.toMillis(conf.getLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, TxConstants.TransactionPruning.DEFAULT_PRUNE_GRACE_PERIOD)); - scheduledExecutorService.scheduleAtFixedRate( - getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis), - scheduleInterval, scheduleInterval, TimeUnit.SECONDS); + pruneRunnable = getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis); + scheduledExecutorService.scheduleAtFixedRate(pruneRunnable, scheduleInterval, scheduleInterval, TimeUnit.SECONDS); LOG.info("Scheduled {} plugins with interval {} seconds", plugins.size(), scheduleInterval); } @@ -104,6 +104,19 @@ public class TransactionPruningService extends AbstractIdleService { LOG.info("Stopped {}", this.getClass().getSimpleName()); } + /** + * Trigger a run of the transaction pruning. It will run as soon as no pruning is running. That is, + * if pruning is running at this moment, then another will start after it is done. + */ + public void pruneNow() { + if (pruneEnabled) { + scheduledExecutorService.execute(pruneRunnable); + LOG.info("Triggered invalid transaction pruning due to request received."); + } else { + LOG.info("Request to trigger transaction pruning received but pruning is not enabled."); + } + } + @VisibleForTesting TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins, http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/thrift/transaction.thrift ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift index f4460e5..0e05244 100644 --- a/tephra-core/src/main/thrift/transaction.thrift +++ b/tephra-core/src/main/thrift/transaction.thrift @@ -86,4 +86,5 @@ service TTransactionServer { TBoolean truncateInvalidTxBefore(1: i64 time) throws (1: TInvalidTruncateTimeException e), i32 invalidTxSize(), TTransaction checkpoint(1: TTransaction tx) throws (1: TTransactionNotInProgressException e), + void pruneNow(), } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java index 7fca246..f4437c2 100644 --- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java @@ -20,7 +20,9 @@ package org.apache.tephra; import com.google.inject.AbstractModule; import com.google.inject.Guice; +import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.inject.Provider; import com.google.inject.Scopes; import com.google.inject.util.Modules; import org.apache.hadoop.conf.Configuration; @@ -34,14 +36,18 @@ import org.apache.tephra.runtime.DiscoveryModules; import org.apache.tephra.runtime.TransactionClientModule; import org.apache.tephra.runtime.TransactionModules; import org.apache.tephra.runtime.ZKModule; +import org.apache.tephra.txprune.TransactionPruningService; import org.apache.tephra.util.Tests; +import org.apache.twill.discovery.DiscoveryService; import org.apache.twill.internal.zookeeper.InMemoryZKServer; +import org.apache.twill.zookeeper.ZKClient; import org.apache.twill.zookeeper.ZKClientService; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +64,8 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest { private static TransactionStateStorage storage; private static TransactionSystemClient txClient; + private static AtomicInteger pruneRuns = new AtomicInteger(); + @ClassRule public static TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -83,6 +91,7 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest { @Override protected void configure() { bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON); + bind(TransactionService.class).to(TestTransactionService.class).in(Scopes.SINGLETON); } }), new TransactionClientModule() @@ -150,6 +159,13 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest { Assert.assertEquals(0, CountingRetryStrategyProvider.retries.get()); } + @Test + public void testPruneNow() { + int runs = pruneRuns.get(); + txClient.pruneNow(); + Assert.assertEquals(pruneRuns.get(), runs + 1); + } + // implements a retry strategy that lets us verify how many times it retried public static class CountingRetryStrategyProvider extends RetryNTimes.Provider { @@ -171,4 +187,23 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest { }; } } + + public static class TestTransactionService extends TransactionService { + @Inject + public TestTransactionService(Configuration conf, ZKClient zkClient, + DiscoveryService discoveryService, + Provider<TransactionManager> txManagerProvider) { + super(conf, zkClient, discoveryService, txManagerProvider); + } + + @Override + protected TransactionPruningService createPruningService(Configuration conf, TransactionManager txManager) { + return new TransactionPruningService(conf, txManager) { + @Override + public void pruneNow() { + pruneRuns.incrementAndGet(); + } + }; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java index 2a0a17e..c8734d7 100644 --- a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java @@ -19,6 +19,7 @@ package org.apache.tephra.txprune; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -67,7 +68,6 @@ public class TransactionPruningServiceTest { "org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin2"); // Setup schedule to run every second conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true); - conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1); conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10); conf.setLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, 0); @@ -96,7 +96,11 @@ public class TransactionPruningServiceTest { pruningService.startAndWait(); // This will cause the pruning run to happen three times, // but we are interested in only first two runs for the assertions later - TimeUnit.SECONDS.sleep(3); + int pruneRuns = TestTransactionPruningRunnable.getRuns(); + pruningService.pruneNow(); + pruningService.pruneNow(); + pruningService.pruneNow(); + TestTransactionPruningRunnable.waitForRuns(pruneRuns + 3, 5, TimeUnit.MILLISECONDS); pruningService.stopAndWait(); // Assert inactive transaction bound that the plugins receive. @@ -131,7 +135,6 @@ public class TransactionPruningServiceTest { "org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin2"); // Setup schedule to run every second conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true); - conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1); conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10); conf.setLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, 0); @@ -160,7 +163,11 @@ public class TransactionPruningServiceTest { pruningService.startAndWait(); // This will cause the pruning run to happen three times, // but we are interested in only first two runs for the assertions later - TimeUnit.SECONDS.sleep(3); + int pruneRuns = TestTransactionPruningRunnable.getRuns(); + pruningService.pruneNow(); + pruningService.pruneNow(); + pruningService.pruneNow(); + TestTransactionPruningRunnable.waitForRuns(pruneRuns + 3, 5, TimeUnit.MILLISECONDS); pruningService.stopAndWait(); // Assert inactive transaction bound @@ -233,6 +240,7 @@ public class TransactionPruningServiceTest { * Extends {@link TransactionPruningRunnable} to use mock time to help in testing. */ private static class TestTransactionPruningRunnable extends TransactionPruningRunnable { + private static int pruneRuns = 0; private static Iterator<Long> currentTime; TestTransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins, long txMaxLifetimeMillis, long txPruneBufferMillis) { @@ -247,6 +255,25 @@ public class TransactionPruningServiceTest { static void setCurrentTime(Iterator<Long> currentTime) { TestTransactionPruningRunnable.currentTime = currentTime; } + + @Override + public void run() { + super.run(); + pruneRuns++; + } + + private static int getRuns() { + return pruneRuns; + } + + public static void waitForRuns(int runs, int timeout, TimeUnit unit) throws Exception { + long timeoutMillis = unit.toMillis(timeout); + Stopwatch stopWatch = new Stopwatch(); + stopWatch.start(); + while (pruneRuns < runs && stopWatch.elapsedMillis() < timeoutMillis) { + TimeUnit.MILLISECONDS.sleep(100); + } + } } /**
