Repository: incubator-omid Updated Branches: refs/heads/0.9.0 [created] 558c640b8
[OMID-62] Use interface instead of TSOClient in AbstractTransactionManager This will make it easy to mock AbstractTransactionManager dependencies This closes #9 Change-Id: Ia3c0d59261d2c8abba4509ff7d97ada93e69c758 Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/4e9cd997 Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/4e9cd997 Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/4e9cd997 Branch: refs/heads/0.9.0 Commit: 4e9cd997653b15a483192d11061c9dd3e82b0a21 Parents: 0c37136 Author: Francisco Perez-Sorrosal <fpe...@yahoo-inc.com> Authored: Thu Mar 2 17:26:02 2017 -0800 Committer: Francisco Perez-Sorrosal <fperezsorro...@apache.org> Committed: Thu Aug 10 11:30:12 2017 -0700 ---------------------------------------------------------------------- .../transaction/HBaseTransactionManager.java | 25 ++++++++++--------- .../transaction/AbstractTransactionManager.java | 26 +++++++++++++++++--- .../apache/omid/tso/client/MockTSOClient.java | 15 ++++++++--- .../org/apache/omid/tso/client/TSOClient.java | 4 +-- .../org/apache/omid/tso/client/TSOProtocol.java | 7 ++++++ 5 files changed, 57 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4e9cd997/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java index 990758c..b31d2c9 100644 --- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java +++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java @@ -30,6 +30,7 @@ import org.apache.omid.committable.hbase.HBaseCommitTableConfig; import org.apache.omid.tools.hbase.HBaseLogin; import org.apache.omid.tso.client.CellId; import org.apache.omid.tso.client.TSOClient; +import org.apache.omid.tso.client.TSOProtocol; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; @@ -73,40 +74,40 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen } @VisibleForTesting - static class Builder { + public static class Builder { // Required parameters private final HBaseOmidClientConfiguration hbaseOmidClientConf; // Optional parameters - initialized to default values - private Optional<TSOClient> tsoClient = Optional.absent(); + private Optional<TSOProtocol> tsoClient = Optional.absent(); private Optional<CommitTable.Client> commitTableClient = Optional.absent(); private Optional<PostCommitActions> postCommitter = Optional.absent(); - private Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) { + public Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) { this.hbaseOmidClientConf = hbaseOmidClientConf; } - Builder tsoClient(TSOClient tsoClient) { + public Builder tsoClient(TSOProtocol tsoClient) { this.tsoClient = Optional.of(tsoClient); return this; } - Builder commitTableClient(CommitTable.Client client) { + public Builder commitTableClient(CommitTable.Client client) { this.commitTableClient = Optional.of(client); return this; } - Builder postCommitter(PostCommitActions postCommitter) { + public Builder postCommitter(PostCommitActions postCommitter) { this.postCommitter = Optional.of(postCommitter); return this; } - HBaseTransactionManager build() throws IOException, InterruptedException { + public HBaseTransactionManager build() throws IOException, InterruptedException { CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get(); PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get(); - TSOClient tsoClient = this.tsoClient.or(buildTSOClient()).get(); + TSOProtocol tsoClient = this.tsoClient.or(buildTSOClient()).get(); return new HBaseTransactionManager(hbaseOmidClientConf, postCommitter, @@ -115,8 +116,8 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen new HBaseTransactionFactory()); } - private Optional<TSOClient> buildTSOClient() throws IOException, InterruptedException { - return Optional.of(TSOClient.newInstance(hbaseOmidClientConf.getOmidClientConfiguration())); + private Optional<TSOProtocol> buildTSOClient() throws IOException, InterruptedException { + return Optional.of((TSOProtocol) TSOClient.newInstance(hbaseOmidClientConf.getOmidClientConfiguration())); } @@ -151,13 +152,13 @@ public class HBaseTransactionManager extends AbstractTransactionManager implemen } @VisibleForTesting - static Builder builder(HBaseOmidClientConfiguration hbaseOmidClientConf) { + public static Builder builder(HBaseOmidClientConfiguration hbaseOmidClientConf) { return new Builder(hbaseOmidClientConf); } private HBaseTransactionManager(HBaseOmidClientConfiguration hBaseOmidClientConfiguration, PostCommitActions postCommitter, - TSOClient tsoClient, + TSOProtocol tsoClient, CommitTable.Client commitTableClient, HBaseTransactionFactory hBaseTransactionFactory) { http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4e9cd997/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java ---------------------------------------------------------------------- diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java index e7dc8cf..bf7d752 100644 --- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java +++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java @@ -30,7 +30,7 @@ import org.apache.omid.tso.client.AbortException; import org.apache.omid.tso.client.CellId; import org.apache.omid.tso.client.ConnectionException; import org.apache.omid.tso.client.ServiceUnavailableException; -import org.apache.omid.tso.client.TSOClient; +import org.apache.omid.tso.client.TSOProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +63,7 @@ public abstract class AbstractTransactionManager implements TransactionManager { } private final PostCommitActions postCommitter; - protected final TSOClient tsoClient; + protected final TSOProtocol tsoClient; protected final CommitTable.Client commitTableClient; private final TransactionFactory<? extends CellId> transactionFactory; @@ -92,7 +92,7 @@ public abstract class AbstractTransactionManager implements TransactionManager { */ public AbstractTransactionManager(MetricsRegistry metrics, PostCommitActions postCommitter, - TSOClient tsoClient, + TSOProtocol tsoClient, CommitTable.Client commitTableClient, TransactionFactory<? extends CellId> transactionFactory) { @@ -356,6 +356,26 @@ public abstract class AbstractTransactionManager implements TransactionManager { } /** + * This function returns the commit timestamp for a particular cell if the transaction was already committed in + * the system. In case the transaction was not committed and the cell was written by transaction initialized by a + * previous TSO server, an invalidation try occurs. + * Otherwise the function returns a value that indicates that the commit timestamp was not found. + * @param cellStartTimestamp + * start timestamp of the cell to locate the commit timestamp for. + * @param locator + * a locator to find the commit timestamp in the system. + * @return the commit timestamp joint with the location where it was found + * or an object indicating that it was not found in the system + * @throws IOException in case of any I/O issues + */ + public CommitTimestamp locateCellCommitTimestamp(long cellStartTimestamp, + CommitTimestampLocator locator) throws IOException { + + return locateCellCommitTimestamp(cellStartTimestamp, tsoClient.getEpoch(), locator); + + } + + /** * @see java.io.Closeable#close() */ @Override http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4e9cd997/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java ---------------------------------------------------------------------- diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java index b4a205e..0511e0f 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java @@ -24,15 +24,18 @@ import java.io.IOException; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -class MockTSOClient implements TSOProtocol { - private final AtomicLong timestampGenerator = new AtomicLong(); +// TODO Would be nice to compile all util classes for testing to a separate package that clients could import for tests +public class MockTSOClient implements TSOProtocol { + private static final int CONFLICT_MAP_SIZE = 1_000_000; + + private final AtomicLong timestampGenerator = new AtomicLong(); private final long[] conflictMap = new long[CONFLICT_MAP_SIZE]; private final AtomicLong lwm = new AtomicLong(); private final CommitTable.Writer commitTable; - MockTSOClient(CommitTable.Writer commitTable) { + public MockTSOClient(CommitTable.Writer commitTable) { this.commitTable = commitTable; } @@ -99,4 +102,10 @@ class MockTSOClient implements TSOProtocol { f.set(null); return new ForwardingTSOFuture<>(f); } + + @Override + public long getEpoch() { + return 0; + } + } http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4e9cd997/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java ---------------------------------------------------------------------- diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java index 1690ca6..fd92792 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java @@ -236,9 +236,9 @@ public class TSOClient implements TSOProtocol, NodeCacheListener { // ---------------------------------------------------------------------------------------------------------------- /** - * Used for high availability support. - * @return the epoch of the TSO server that initialized this transaction. + * @see TSOProtocol#getEpoch() */ + @Override public long getEpoch() { return epoch; } http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4e9cd997/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java ---------------------------------------------------------------------- diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java index 198913a..fae4b96 100644 --- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java +++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java @@ -25,6 +25,13 @@ import java.util.Set; public interface TSOProtocol { /** + * Returns the epoch of the current TSO server. Used in HA mode. + * + * @return the epoch. + */ + long getEpoch(); + + /** * Returns a new timestamp assigned by on the server-side * @return the newly assigned timestamp as a future. If an error was detected, the future will contain a * corresponding protocol exception