http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java new file mode 100644 index 0000000..66a70fb --- /dev/null +++ b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso; + +import org.apache.omid.metrics.MetricsRegistry; +import org.apache.omid.timestamp.storage.TimestampStorage; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class TestTimestampOracle { + + private static final Logger LOG = LoggerFactory.getLogger(TestTimestampOracle.class); + + @Mock + private MetricsRegistry metrics; + @Mock + private Panicker panicker; + @Mock + private TimestampStorage timestampStorage; + + // Component under test + @InjectMocks + private TimestampOracleImpl timestampOracle; + + @BeforeMethod(alwaysRun = true, timeOut = 30_000) + public void initMocksAndComponents() { + MockitoAnnotations.initMocks(this); + } + + @Test(timeOut = 10_000) + public void testMonotonicTimestampGrowth() throws Exception { + + // Intialize component under test + timestampOracle.initialize(); + + long last = timestampOracle.next(); + for (int i = 0; i < (3 * TimestampOracleImpl.TIMESTAMP_BATCH); i++) { + long current = timestampOracle.next(); + assertEquals(current, last + 1, "Not monotonic growth"); + last = current; + } + assertTrue(timestampOracle.getLast() == last); + LOG.info("Last timestamp: {}", last); + } + + @Test(timeOut = 10_000) + public void testTimestampOraclePanicsWhenTheStorageHasProblems() throws Exception { + + // Intialize component under test + timestampOracle.initialize(); + + // Cause an exception when updating the max timestamp + final CountDownLatch updateMaxTimestampMethodCalled = new CountDownLatch(1); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + updateMaxTimestampMethodCalled.countDown(); + throw new RuntimeException("Out of memory or something"); + } + }).when(timestampStorage).updateMaxTimestamp(anyLong(), anyLong()); + + // Make the previous exception to be thrown + Thread allocThread = new Thread("AllocThread") { + @Override + public void run() { + try { + while (true) { + timestampOracle.next(); + } + } catch (IOException ioe) { + LOG.error("Shouldn't occur"); + } + } + }; + allocThread.start(); + + updateMaxTimestampMethodCalled.await(); + + // Verify that it has blown up + verify(panicker, atLeastOnce()).panic(anyString(), any(Throwable.class)); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientAccessor.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientAccessor.java b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientAccessor.java new file mode 100644 index 0000000..74ed196 --- /dev/null +++ b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientAccessor.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso.client; + +import org.apache.statemachine.StateMachine.FsmImpl; + +public class TSOClientAccessor { + + public static void closeChannel(TSOClient tsoClient) throws InterruptedException { + FsmImpl fsm = (FsmImpl) tsoClient.fsm; + TSOClient.ConnectedState connectedState = (TSOClient.ConnectedState) fsm.getState(); + connectedState.channel.close().await(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientOneShot.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientOneShot.java b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientOneShot.java new file mode 100644 index 0000000..ff60753 --- /dev/null +++ b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientOneShot.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso.client; + +import org.apache.omid.proto.TSOProto; +import org.apache.omid.proto.TSOProto.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutionException; + +/** + * Communication endpoint for TSO clients. + */ +public class TSOClientOneShot { + + private static final Logger LOG = LoggerFactory.getLogger(TSOClientOneShot.class); + + private final String host; + private final int port; + + public TSOClientOneShot(String host, int port) { + + this.host = host; + this.port = port; + + } + + public TSOProto.Response makeRequest(TSOProto.Request request) + throws InterruptedException, ExecutionException { + TSOClientRaw raw = new TSOClientRaw(host, port); + + // do handshake + TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder(); + handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build()); + raw.write(TSOProto.Request.newBuilder() + .setHandshakeRequest(handshake.build()).build()); + Response response = raw.getResponse().get(); + assert (response.getHandshakeResponse().getClientCompatible()); + + raw.write(request); + response = raw.getResponse().get(); + + raw.close(); + return response; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java new file mode 100644 index 0000000..beb6c47 --- /dev/null +++ b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso.client; + +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.omid.proto.TSOProto; +import org.apache.omid.proto.TSOProto.Response; +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelHandler; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; +import org.jboss.netty.handler.codec.frame.LengthFieldPrepender; +import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder; +import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * Raw client for communicating with tso server directly with protobuf messages + */ +public class TSOClientRaw { + + private static final Logger LOG = LoggerFactory.getLogger(TSOClientRaw.class); + + private final BlockingQueue<SettableFuture<Response>> responseQueue + = new ArrayBlockingQueue<SettableFuture<Response>>(5); + private final Channel channel; + + public TSOClientRaw(String host, int port) throws InterruptedException, ExecutionException { + // Start client with Nb of active threads = 3 as maximum. + ChannelFactory factory = new NioClientSocketChannelFactory( + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()), + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), 3); + // Create the bootstrap + ClientBootstrap bootstrap = new ClientBootstrap(factory); + + InetSocketAddress addr = new InetSocketAddress(host, port); + + ChannelPipeline pipeline = bootstrap.getPipeline(); + pipeline.addLast("lengthbaseddecoder", + new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4)); + pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); + pipeline.addLast("protobufdecoder", + new ProtobufDecoder(TSOProto.Response.getDefaultInstance())); + pipeline.addLast("protobufencoder", new ProtobufEncoder()); + + Handler handler = new Handler(); + pipeline.addLast("handler", handler); + + bootstrap.setOption("tcpNoDelay", true); + bootstrap.setOption("keepAlive", true); + bootstrap.setOption("reuseAddress", true); + bootstrap.setOption("connectTimeoutMillis", 100); + + ChannelFuture channelFuture = bootstrap.connect(addr).await(); + channel = channelFuture.getChannel(); + } + + public void write(TSOProto.Request request) { + channel.write(request); + } + + public Future<Response> getResponse() throws InterruptedException { + SettableFuture<Response> future = SettableFuture.<Response>create(); + responseQueue.put(future); + return future; + } + + public void close() throws InterruptedException { + responseQueue.put(SettableFuture.<Response>create()); + channel.close(); + } + + private class Handler extends SimpleChannelHandler { + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { + LOG.info("Message received", e); + if (e.getMessage() instanceof Response) { + Response resp = (Response) e.getMessage(); + try { + SettableFuture<Response> future = responseQueue.take(); + future.set(resp); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted in handler", ie); + } + } else { + LOG.warn("Received unknown message", e.getMessage()); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + LOG.info("Exception received", e.getCause()); + try { + SettableFuture<Response> future = responseQueue.take(); + future.setException(e.getCause()); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted handling exception", ie); + } + } + + @Override + public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) + throws Exception { + LOG.info("Disconnected"); + try { + SettableFuture<Response> future = responseQueue.take(); + future.setException(new ConnectionException()); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted handling exception", ie); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java new file mode 100644 index 0000000..9f7263e --- /dev/null +++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso.client; + +import com.google.common.collect.Sets; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import org.apache.omid.TestUtils; +import org.apache.omid.committable.CommitTable; +import org.apache.omid.tso.TSOMockModule; +import org.apache.omid.tso.TSOServer; +import org.apache.omid.tso.TSOServerConfig; +import org.apache.omid.tso.util.DummyCellIdImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.AssertJUnit.assertNotNull; + +public class TestIntegrationOfTSOClientServerBasicFunctionality { + + private static final Logger LOG = LoggerFactory.getLogger(TestIntegrationOfTSOClientServerBasicFunctionality.class); + + private static final String TSO_SERVER_HOST = "localhost"; + private int tsoServerPortForTest; + + // Cells for tests + private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL); + private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL); + + // Required infrastructure for TSO tsoClient-server integration testing + private TSOServer tsoServer; + private TSOClient tsoClient; + private TSOClient justAnotherTSOClient; + private CommitTable.Client commitTableClient; + + @BeforeClass + public void setup() throws Exception { + + tsoServerPortForTest = TestUtils.getFreeLocalPort(); + + TSOServerConfig tsoConfig = new TSOServerConfig(); + tsoConfig.setMaxItems(1000); + tsoConfig.setPort(tsoServerPortForTest); + Module tsoServerMockModule = new TSOMockModule(tsoConfig); + Injector injector = Guice.createInjector(tsoServerMockModule); + + CommitTable commitTable = injector.getInstance(CommitTable.class); + commitTableClient = commitTable.getClient(); + + LOG.info("=================================================================================================="); + LOG.info("======================================= Init TSO Server =========================================="); + LOG.info("=================================================================================================="); + + tsoServer = injector.getInstance(TSOServer.class); + tsoServer.startAndWait(); + TestUtils.waitForSocketListening(TSO_SERVER_HOST, tsoServerPortForTest, 100); + + LOG.info("=================================================================================================="); + LOG.info("===================================== TSO Server Initialized ====================================="); + LOG.info("=================================================================================================="); + + LOG.info("=================================================================================================="); + LOG.info("======================================= Setup TSO Clients ========================================"); + LOG.info("=================================================================================================="); + + // Configure direct connection to the server + OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); + tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + tsoServerPortForTest); + + tsoClient = TSOClient.newInstance(tsoClientConf); + justAnotherTSOClient = TSOClient.newInstance(tsoClientConf); + + LOG.info("=================================================================================================="); + LOG.info("===================================== TSO Clients Initialized ===================================="); + LOG.info("=================================================================================================="); + + Thread.currentThread().setName("Test Thread"); + + } + + @AfterClass + public void tearDown() throws Exception { + + tsoClient.close().get(); + + tsoServer.stopAndWait(); + tsoServer = null; + TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, tsoServerPortForTest, 1000); + + } + + @Test(timeOut = 30_000) + public void testTimestampsOrderingGrowMonotonically() throws Exception { + long referenceTimestamp; + long startTsTx1 = tsoClient.getNewStartTimestamp().get(); + referenceTimestamp = startTsTx1; + + long startTsTx2 = tsoClient.getNewStartTimestamp().get(); + assertEquals(startTsTx2, ++referenceTimestamp, "Should grow monotonically"); + assertTrue(startTsTx2 > startTsTx1, "Two timestamps obtained consecutively should grow"); + + long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1)).get(); + assertEquals(commitTsTx2, ++referenceTimestamp, "Should grow monotonically"); + + long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c2)).get(); + assertEquals(commitTsTx1, ++referenceTimestamp, "Should grow monotonically"); + + long startTsTx3 = tsoClient.getNewStartTimestamp().get(); + assertEquals(startTsTx3, ++referenceTimestamp, "Should grow monotonically"); + } + + @Test(timeOut = 30_000) + public void testSimpleTransactionWithNoWriteSetCanCommit() throws Exception { + long startTsTx1 = tsoClient.getNewStartTimestamp().get(); + long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.<CellId>newHashSet()).get(); + assertTrue(commitTsTx1 > startTsTx1); + } + + @Test(timeOut = 30_000) + public void testTransactionWithMassiveWriteSetCanCommit() throws Exception { + long startTs = tsoClient.getNewStartTimestamp().get(); + + Set<CellId> cells = new HashSet<>(); + for (int i = 0; i < 1_000_000; i++) { + cells.add(new DummyCellIdImpl(i)); + } + + long commitTs = tsoClient.commit(startTs, cells).get(); + assertTrue(commitTs > startTs, "Commit TS should be higher than Start TS"); + } + + @Test(timeOut = 30_000) + public void testMultipleSerialCommitsDoNotConflict() throws Exception { + long startTsTx1 = tsoClient.getNewStartTimestamp().get(); + long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get(); + assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be greater than Start TS"); + + long startTsTx2 = tsoClient.getNewStartTimestamp().get(); + assertTrue(startTsTx2 > commitTsTx1, "TS should grow monotonically"); + + long commitTsTx2 = tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get(); + assertTrue(commitTsTx2 > startTsTx2, "Commit TS must be greater than Start TS"); + + long startTsTx3 = tsoClient.getNewStartTimestamp().get(); + long commitTsTx3 = tsoClient.commit(startTsTx3, Sets.newHashSet(c2)).get(); + assertTrue(commitTsTx3 > startTsTx3, "Commit TS must be greater than Start TS"); + } + + @Test(timeOut = 30_000) + public void testCommitWritesToCommitTable() throws Exception { + long startTsForTx1 = tsoClient.getNewStartTimestamp().get(); + long startTsForTx2 = tsoClient.getNewStartTimestamp().get(); + assertTrue(startTsForTx2 > startTsForTx1, "Start TS should grow"); + + assertFalse(commitTableClient.getCommitTimestamp(startTsForTx1).get().isPresent(), + "Commit TS for Tx1 shouldn't appear in Commit Table"); + + long commitTsForTx1 = tsoClient.commit(startTsForTx1, Sets.newHashSet(c1)).get(); + assertTrue(commitTsForTx1 > startTsForTx1, "Commit TS should be higher than Start TS for the same tx"); + + Long commitTs1InCommitTable = commitTableClient.getCommitTimestamp(startTsForTx1).get().get().getValue(); + assertNotNull("Tx is committed, should return as such from Commit Table", commitTs1InCommitTable); + assertEquals(commitTsForTx1, (long) commitTs1InCommitTable, + "getCommitTimestamp() & commit() should report same Commit TS value for same tx"); + assertTrue(commitTs1InCommitTable > startTsForTx2, "Commit TS should be higher than tx's Start TS"); + } + + @Test(timeOut = 30_000) + public void testTwoConcurrentTxWithOverlappingWritesetsHaveConflicts() throws Exception { + long startTsTx1 = tsoClient.getNewStartTimestamp().get(); + long startTsTx2 = tsoClient.getNewStartTimestamp().get(); + assertTrue(startTsTx2 > startTsTx1, "Second TX should have higher TS"); + + long commitTsTx1 = tsoClient.commit(startTsTx1, Sets.newHashSet(c1)).get(); + assertTrue(commitTsTx1 > startTsTx1, "Commit TS must be higher than Start TS for the same tx"); + + try { + tsoClient.commit(startTsTx2, Sets.newHashSet(c1, c2)).get(); + Assert.fail("Second TX should fail on commit"); + } catch (ExecutionException ee) { + assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted"); + } + } + + @Test(timeOut = 30_000) + public void testConflictsAndMonotonicallyTimestampGrowthWithTwoDifferentTSOClients() throws Exception { + long startTsTx1Client1 = tsoClient.getNewStartTimestamp().get(); + long startTsTx2Client1 = tsoClient.getNewStartTimestamp().get(); + long startTsTx3Client1 = tsoClient.getNewStartTimestamp().get(); + + tsoClient.commit(startTsTx1Client1, Sets.newHashSet(c1)).get(); + try { + tsoClient.commit(startTsTx3Client1, Sets.newHashSet(c1, c2)).get(); + Assert.fail("Second commit should fail as conflicts with the previous concurrent one"); + } catch (ExecutionException ee) { + assertEquals(AbortException.class, ee.getCause().getClass(), "Should have aborted"); + } + long startTsTx4Client2 = justAnotherTSOClient.getNewStartTimestamp().get(); + + assertFalse(commitTableClient.getCommitTimestamp(startTsTx3Client1).get().isPresent(), "Tx3 didn't commit"); + long commitTSTx1 = commitTableClient.getCommitTimestamp(startTsTx1Client1).get().get().getValue(); + assertTrue(commitTSTx1 > startTsTx2Client1, "Tx1 committed after Tx2 started"); + assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java new file mode 100644 index 0000000..2b4c312 --- /dev/null +++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientConnectionToTSO.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso.client; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.omid.TestUtils; +import org.apache.omid.tso.HALeaseManagementModule; +import org.apache.omid.tso.TSOMockModule; +import org.apache.omid.tso.TSOServer; +import org.apache.omid.tso.TSOServerConfig; +import org.apache.omid.tso.VoidLeaseManagementModule; +import org.apache.statemachine.StateMachine.FsmImpl; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.test.TestingServer; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.concurrent.ExecutionException; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +public class TestTSOClientConnectionToTSO { + + private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientConnectionToTSO.class); + + // Constants and variables for component connectivity + private static final String TSO_HOST = "localhost"; + private static final String CURRENT_TSO_PATH = "/current_tso_path"; + private static final String TSO_LEASE_PATH = "/tso_lease_path"; + + private int tsoPortForTest; + private String zkClusterForTest; + + private Injector injector = null; + + private TestingServer zkServer; + + private CuratorFramework zkClient; + private TSOServer tsoServer; + + @BeforeMethod + public void beforeMethod() throws Exception { + + tsoPortForTest = TestUtils.getFreeLocalPort(); + + int zkPortForTest = TestUtils.getFreeLocalPort(); + zkClusterForTest = TSO_HOST + ":" + zkPortForTest; + LOG.info("Starting ZK Server in port {}", zkPortForTest); + zkServer = TestUtils.provideTestingZKServer(zkPortForTest); + LOG.info("ZK Server Started @ {}", zkServer.getConnectString()); + + zkClient = TestUtils.provideConnectedZKClient(zkClusterForTest); + + Stat stat; + try { + zkClient.delete().forPath(CURRENT_TSO_PATH); + stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH); + assertNull(stat, CURRENT_TSO_PATH + " should not exist"); + } catch (NoNodeException e) { + LOG.info("{} ZNode did not exist", CURRENT_TSO_PATH); + } + + } + + @AfterMethod + public void afterMethod() { + + zkClient.close(); + + CloseableUtils.closeQuietly(zkServer); + zkServer = null; + LOG.info("ZK Server Stopped"); + + } + + @Test(timeOut = 30_000) + public void testUnsuccessfulConnectionToTSO() throws Exception { + + // When no HA node for TSOServer is found & no host:port config exists + // we should get an exception when getting the client + try { + TSOClient.newInstance(new OmidClientConfiguration()); + } catch (IllegalArgumentException e) { + // Expected + } + + } + + @Test(timeOut = 30_000) + public void testSuccessfulConnectionToTSOWithHostAndPort() throws Exception { + + // Launch a TSO WITHOUT publishing the address in HA... + TSOServerConfig tsoConfig = new TSOServerConfig(); + tsoConfig.setMaxItems(1000); + tsoConfig.setPort(tsoPortForTest); + tsoConfig.setLeaseModule(new VoidLeaseManagementModule()); + injector = Guice.createInjector(new TSOMockModule(tsoConfig)); + LOG.info("Starting TSO"); + tsoServer = injector.getInstance(TSOServer.class); + tsoServer.startAndWait(); + TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100); + LOG.info("Finished loading TSO"); + + // When no HA node for TSOServer is found we should get a connection + // to the TSO through the host:port configured... + OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); + tsoClientConf.setConnectionString("localhost:" + tsoPortForTest); + tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH); + TSOClient tsoClient = TSOClient.newInstance(tsoClientConf); + + // ... so we should get responses from the methods + Long startTS = tsoClient.getNewStartTimestamp().get(); + LOG.info("Start TS {} ", startTS); + assertEquals(startTS.longValue(), 1); + + // Close the tsoClient connection and stop the TSO Server + tsoClient.close().get(); + tsoServer.stopAndWait(); + tsoServer = null; + TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000); + LOG.info("TSO Server Stopped"); + + } + + @Test(timeOut = 30_000) + public void testSuccessfulConnectionToTSOThroughZK() throws Exception { + + // Launch a TSO publishing the address in HA... + TSOServerConfig config = new TSOServerConfig(); + config.setMaxItems(1000); + config.setPort(tsoPortForTest); + config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid")); + injector = Guice.createInjector(new TSOMockModule(config)); + LOG.info("Starting TSO"); + tsoServer = injector.getInstance(TSOServer.class); + tsoServer.startAndWait(); + TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100); + LOG.info("Finished loading TSO"); + + waitTillTsoRegisters(injector.getInstance(CuratorFramework.class)); + + // When a HA node for TSOServer is found we should get a connection + OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); + tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA); + tsoClientConf.setConnectionString(zkClusterForTest); + tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH); + TSOClient tsoClient = TSOClient.newInstance(tsoClientConf); + + // ... so we should get responses from the methods + Long startTS = tsoClient.getNewStartTimestamp().get(); + LOG.info("Start TS {} ", startTS); + assertEquals(startTS.longValue(), 1); + + // Close the tsoClient connection and stop the TSO Server + tsoClient.close().get(); + tsoServer.stopAndWait(); + tsoServer = null; + TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000); + LOG.info("TSO Server Stopped"); + + } + + @Test(timeOut = 30_000) + public void testSuccessOfTSOClientReconnectionsToARestartedTSOWithZKPublishing() throws Exception { + + // Start a TSO with HA... + TSOServerConfig config = new TSOServerConfig(); + config.setMaxItems(1000); + config.setPort(tsoPortForTest); + config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid")); + injector = Guice.createInjector(new TSOMockModule(config)); + LOG.info("Starting Initial TSO"); + tsoServer = injector.getInstance(TSOServer.class); + tsoServer.startAndWait(); + TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100); + LOG.info("Finished loading TSO"); + + waitTillTsoRegisters(injector.getInstance(CuratorFramework.class)); + + // Then create the TSO Client under test... + OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); + tsoClientConf.setConnectionType(OmidClientConfiguration.ConnType.HA); + tsoClientConf.setConnectionString(zkClusterForTest); + tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH); + TSOClient tsoClient = TSOClient.newInstance(tsoClientConf); + + // ... and check that initially we get responses from the methods + Long startTS = tsoClient.getNewStartTimestamp().get(); + LOG.info("Start TS {} ", startTS); + assertEquals(startTS.longValue(), 1); + + // Then stop the server... + tsoServer.stopAndWait(); + tsoServer = null; + TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000); + LOG.info("Initial TSO Server Stopped"); + + Thread.sleep(1500); // ...allow the client to receive disconnection event... + // ... and check that we get a conn exception when trying to access the client + try { + startTS = tsoClient.getNewStartTimestamp().get(); + fail(); + } catch (ExecutionException e) { + LOG.info("Exception expected"); + // Internal accessor to fsm to do the required checkings + FsmImpl fsm = (FsmImpl) tsoClient.fsm; + assertEquals(e.getCause().getClass(), ConnectionException.class); + assertTrue(fsm.getState().getClass().equals(TSOClient.ConnectionFailedState.class) + || + fsm.getState().getClass().equals(TSOClient.DisconnectedState.class)); + } + + // After that, simulate that a new TSO has been launched... + Injector newInjector = Guice.createInjector(new TSOMockModule(config)); + LOG.info("Re-Starting again the TSO"); + tsoServer = newInjector.getInstance(TSOServer.class); + tsoServer.startAndWait(); + TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100); + LOG.info("Finished loading restarted TSO"); + + // Finally re-check that, eventually, we can get a new value from the new TSO... + boolean reconnectionActive = false; + while (!reconnectionActive) { + try { + startTS = tsoClient.getNewStartTimestamp().get(); + reconnectionActive = true; + } catch (ExecutionException e) { + // Expected + } + } + assertNotNull(startTS); + + // ...and stop the server + tsoServer.stopAndWait(); + TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000); + LOG.info("Restarted TSO Server Stopped"); + } + + private void waitTillTsoRegisters(CuratorFramework zkClient) throws Exception { + while (true) { + try { + Stat stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH); + if (stat == null) { + continue; + } + LOG.info("TSO registered in HA with path {}={}", CURRENT_TSO_PATH, stat.toString()); + if (stat.toString().length() == 0) { + continue; + } + return; + } catch (Exception e) { + LOG.debug("TSO still has not registered yet, sleeping...", e); + Thread.sleep(500); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java new file mode 100644 index 0000000..44c4858 --- /dev/null +++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.java @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso.client; + +import com.google.common.collect.Sets; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import org.apache.omid.TestUtils; +import org.apache.omid.committable.CommitTable; +import org.apache.omid.proto.TSOProto; +import org.apache.omid.tso.PausableTimestampOracle; +import org.apache.omid.tso.TSOMockModule; +import org.apache.omid.tso.TSOServer; +import org.apache.omid.tso.TSOServerConfig; +import org.apache.omid.tso.TimestampOracle; +import org.apache.omid.tso.util.DummyCellIdImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +public class TestTSOClientRequestAndResponseBehaviours { + + private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRequestAndResponseBehaviours.class); + + private static final String TSO_SERVER_HOST = "localhost"; + private static final int TSO_SERVER_PORT = 1234; + + private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL); + private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL); + + private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2); + + private OmidClientConfiguration tsoClientConf; + + // Required infrastructure for TSOClient test + private TSOServer tsoServer; + private PausableTimestampOracle pausableTSOracle; + private CommitTable commitTable; + + @BeforeClass + public void setup() throws Exception { + + TSOServerConfig tsoConfig = new TSOServerConfig(); + tsoConfig.setMaxItems(1000); + tsoConfig.setPort(TSO_SERVER_PORT); + Module tsoServerMockModule = new TSOMockModule(tsoConfig); + Injector injector = Guice.createInjector(tsoServerMockModule); + + LOG.info("=================================================================================================="); + LOG.info("======================================= Init TSO Server =========================================="); + LOG.info("=================================================================================================="); + + tsoServer = injector.getInstance(TSOServer.class); + tsoServer.startAndWait(); + TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100); + + LOG.info("=================================================================================================="); + LOG.info("===================================== TSO Server Initialized ====================================="); + LOG.info("=================================================================================================="); + + pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class); + commitTable = injector.getInstance(CommitTable.class); + + } + + @AfterClass + public void tearDown() throws Exception { + + tsoServer.stopAndWait(); + tsoServer = null; + TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000); + + } + + @BeforeMethod + public void beforeMethod() { + OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); + tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT); + + this.tsoClientConf = tsoClientConf; + + } + + @AfterMethod + public void afterMethod() { + + pausableTSOracle.resume(); + + } + + /** + * Test to ensure TSOClient timeouts are cancelled. + * At some point a bug was detected because the TSOClient timeouts were not cancelled, and as timestamp requests + * had no way to be correlated to timestamp responses, random requests were just timed out after a certain time. + * We send a lot of timestamp requests, and wait for them to complete. + * Ensure that the next request doesn't get hit by the timeouts of the previous + * requests. (i.e. make sure we cancel timeouts) + */ + @Test(timeOut = 30_000) + public void testTimeoutsAreCancelled() throws Exception { + + TSOClient client = TSOClient.newInstance(tsoClientConf); + int requestTimeoutInMs = 500; + int requestMaxRetries = 5; + LOG.info("Request timeout {} ms; Max retries {}", requestTimeoutInMs, requestMaxRetries); + Future<Long> f = null; + for (int i = 0; i < (requestMaxRetries * 10); i++) { + f = client.getNewStartTimestamp(); + } + if (f != null) { + f.get(); + } + pausableTSOracle.pause(); + long msToSleep = ((long) (requestTimeoutInMs * 0.75)); + LOG.info("Sleeping for {} ms", msToSleep); + TimeUnit.MILLISECONDS.sleep(msToSleep); + f = client.getNewStartTimestamp(); + msToSleep = ((long) (requestTimeoutInMs * 0.9)); + LOG.info("Sleeping for {} ms", msToSleep); + TimeUnit.MILLISECONDS.sleep(msToSleep); + LOG.info("Resuming"); + pausableTSOracle.resume(); + f.get(); + + } + + @Test(timeOut = 30_000) + public void testCommitGetsServiceUnavailableExceptionWhenCommunicationFails() throws Exception { + + OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration(); + testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT); + testTSOClientConf.setRequestMaxRetries(0); + TSOClient client = TSOClient.newInstance(testTSOClientConf); + + List<Long> startTimestamps = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + startTimestamps.add(client.getNewStartTimestamp().get()); + } + + pausableTSOracle.pause(); + + List<Future<Long>> futures = new ArrayList<>(); + for (long s : startTimestamps) { + futures.add(client.commit(s, Sets.<CellId>newHashSet())); + } + TSOClientAccessor.closeChannel(client); + + for (Future<Long> f : futures) { + try { + f.get(); + fail("Shouldn't be able to complete"); + } catch (ExecutionException ee) { + assertTrue(ee.getCause() instanceof ServiceUnavailableException, + "Should be a service unavailable exception"); + } + } + } + + /** + * Test that if a client tries to make a request without handshaking, it will be disconnected. + */ + @Test(timeOut = 30_000) + public void testHandshakeBetweenOldClientAndCurrentServer() throws Exception { + + TSOClientRaw raw = new TSOClientRaw(TSO_SERVER_HOST, TSO_SERVER_PORT); + + TSOProto.Request request = TSOProto.Request.newBuilder() + .setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build()) + .build(); + raw.write(request); + try { + raw.getResponse().get(); + fail("Channel should be closed"); + } catch (ExecutionException ee) { + assertEquals(ee.getCause().getClass(), ConnectionException.class, "Should be channel closed exception"); + } + raw.close(); + + } + + // ---------------------------------------------------------------------------------------------------------------- + // Test duplicate commits + // ---------------------------------------------------------------------------------------------------------------- + + /** + * This tests the case where messages arrive at the TSO out of order. This can happen in the case + * the channel get dropped and the retry is done in a new channel. However, the TSO will respond with + * aborted to the original message because the retry was already committed and it would be prohibitively + * expensive to check all non-retry requests to see if they are already committed. For this reason + * a client must ensure that if it is sending a retry due to a socket error, the previous channel + * must be entirely closed so that it will not actually receive the abort response. TCP guarantees + * that this doesn't happen in non-socket error cases. + * + */ + @Test(timeOut = 30_000) + public void testOutOfOrderMessages() throws Exception { + + TSOClient client = TSOClient.newInstance(tsoClientConf); + TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT); + + long ts1 = client.getNewStartTimestamp().get(); + + TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet)); + TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet)); + assertFalse(response1.getCommitResponse().getAborted(), "Retry Transaction should commit"); + assertTrue(response2.getCommitResponse().getAborted(), "Transaction should abort"); + } + + @Test(timeOut = 30_000) + public void testDuplicateCommitAborting() throws Exception { + + TSOClient client = TSOClient.newInstance(tsoClientConf); + TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT); + + long ts1 = client.getNewStartTimestamp().get(); + long ts2 = client.getNewStartTimestamp().get(); + client.commit(ts2, testWriteSet).get(); + + TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet)); + TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet)); + assertTrue(response1.getCommitResponse().getAborted(), "Transaction should abort"); + assertTrue(response2.getCommitResponse().getAborted(), "Retry commit should abort"); + } + + @Test(timeOut = 30_000) + public void testDuplicateCommit() throws Exception { + + TSOClient client = TSOClient.newInstance(tsoClientConf); + TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT); + + long ts1 = client.getNewStartTimestamp().get(); + + TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet)); + TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet)); + assertEquals(response2.getCommitResponse().getCommitTimestamp(), + response1.getCommitResponse().getCommitTimestamp(), + "Commit timestamp should be the same"); + } + + // ---------------------------------------------------------------------------------------------------------------- + // Test TSOClient retry behaviour + // ---------------------------------------------------------------------------------------------------------------- + + @Test(timeOut = 30_000) + public void testCommitCanSucceedWhenChannelDisconnected() throws Exception { + + TSOClient client = TSOClient.newInstance(tsoClientConf); + + long ts1 = client.getNewStartTimestamp().get(); + pausableTSOracle.pause(); + TSOFuture<Long> future = client.commit(ts1, testWriteSet); + TSOClientAccessor.closeChannel(client); + pausableTSOracle.resume(); + future.get(); + + } + + @Test(timeOut = 30_000) + public void testCommitCanSucceedWithMultipleTimeouts() throws Exception { + + OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration(); + testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT); + testTSOClientConf.setRequestTimeoutInMs(100); + testTSOClientConf.setRequestMaxRetries(10000); + TSOClient client = TSOClient.newInstance(testTSOClientConf); + + long ts1 = client.getNewStartTimestamp().get(); + pausableTSOracle.pause(); + TSOFuture<Long> future = client.commit(ts1, testWriteSet); + TimeUnit.SECONDS.sleep(1); + pausableTSOracle.resume(); + future.get(); + } + + @Test(timeOut = 30_000) + public void testCommitFailWhenTSOIsDown() throws Exception { + + OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration(); + testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT); + testTSOClientConf.setRequestTimeoutInMs(100); + testTSOClientConf.setRequestMaxRetries(10); + TSOClient client = TSOClient.newInstance(testTSOClientConf); + + long ts1 = client.getNewStartTimestamp().get(); + pausableTSOracle.pause(); + TSOFuture<Long> future = client.commit(ts1, testWriteSet); + try { + future.get(); + } catch (ExecutionException e) { + assertEquals(e.getCause().getClass(), ServiceUnavailableException.class, + "Should be a ServiceUnavailableExeption"); + } + + } + + @Test(timeOut = 30_000) + public void testTimestampRequestSucceedWithMultipleTimeouts() throws Exception { + + OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration(); + testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT); + testTSOClientConf.setRequestTimeoutInMs(100); + testTSOClientConf.setRequestMaxRetries(10000); + TSOClient client = TSOClient.newInstance(testTSOClientConf); + + pausableTSOracle.pause(); + Future<Long> future = client.getNewStartTimestamp(); + TimeUnit.SECONDS.sleep(1); + pausableTSOracle.resume(); + future.get(); + + } + + // ---------------------------------------------------------------------------------------------------------------- + // The next 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side + // (They exercise the communication protocol) TODO Remove??? + // ---------------------------------------------------------------------------------------------------------------- + @Test + public void testCommitTimestampPresentInCommitTableReturnsCommit() throws Exception { + + TSOClient client = TSOClient.newInstance(tsoClientConf); + TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT); + + long tx1ST = client.getNewStartTimestamp().get(); + + clientOneShot.makeRequest(createRetryCommitRequest(tx1ST)); + TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST)); + assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed"); + assertFalse(response.getCommitResponse().getMakeHeuristicDecision()); + assertEquals(response.getCommitResponse().getCommitTimestamp(), tx1ST + 1); + } + + @Test + public void testInvalidCommitTimestampPresentInCommitTableReturnsAbort() throws Exception { + + TSOClient client = TSOClient.newInstance(tsoClientConf); + TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT); + + long tx1ST = client.getNewStartTimestamp().get(); + // Invalidate the transaction + commitTable.getClient().tryInvalidateTransaction(tx1ST); + + clientOneShot.makeRequest(createRetryCommitRequest(tx1ST)); + TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST)); + assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted"); + assertFalse(response.getCommitResponse().getMakeHeuristicDecision()); + assertEquals(response.getCommitResponse().getCommitTimestamp(), 0); + } + + @Test + public void testCommitTimestampNotPresentInCommitTableReturnsAnAbort() throws Exception { + + TSOClient client = TSOClient.newInstance(tsoClientConf); + TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT); + + long tx1ST = client.getNewStartTimestamp().get(); + + clientOneShot.makeRequest(createRetryCommitRequest(tx1ST)); + + // Simulate remove entry from the commit table before exercise retry + commitTable.getClient().completeTransaction(tx1ST); + + TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST)); + assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort"); + assertFalse(response.getCommitResponse().getMakeHeuristicDecision()); + assertEquals(response.getCommitResponse().getCommitTimestamp(), 0); + } + // ---------------------------------------------------------------------------------------------------------------- + // The previous 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side + // (They exercise the communication protocol) TODO Remove??? + // ---------------------------------------------------------------------------------------------------------------- + + // ---------------------------------------------------------------------------------------------------------------- + // Helper methods + // ---------------------------------------------------------------------------------------------------------------- + + private TSOProto.Request createRetryCommitRequest(long ts) { + return createCommitRequest(ts, true, testWriteSet); + } + + private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) { + TSOProto.Request.Builder builder = TSOProto.Request.newBuilder(); + TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder(); + commitBuilder.setStartTimestamp(ts); + commitBuilder.setIsRetry(retry); + for (CellId cell : writeSet) { + commitBuilder.addCellId(cell.getCellId()); + } + return builder.setCommitRequest(commitBuilder.build()).build(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java new file mode 100644 index 0000000..cf05a9a --- /dev/null +++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientResponseHandling.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso.client; + +import org.apache.omid.tso.ProgrammableTSOServer; +import org.apache.omid.tso.ProgrammableTSOServer.AbortResponse; +import org.apache.omid.tso.ProgrammableTSOServer.CommitResponse; +import org.apache.omid.tso.ProgrammableTSOServer.TimestampResponse; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.ExecutionException; + +import static org.testng.Assert.assertEquals; + +public class TestTSOClientResponseHandling { + + private static final int TSO_PORT = 4321; + private static final long START_TS = 1L; + private static final long COMMIT_TS = 2L; + + private ProgrammableTSOServer tsoServer = new ProgrammableTSOServer(TSO_PORT); + // Client under test + private TSOClient tsoClient; + + @BeforeClass + public void configureAndCreateClient() throws IOException, InterruptedException { + + OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); + tsoClientConf.setConnectionString("localhost:" + TSO_PORT); + tsoClient = TSOClient.newInstance(tsoClientConf); + } + + @BeforeMethod + public void reset() { + tsoServer.cleanResponses(); + } + + @Test + public void testTimestampRequestReceivingASuccessfulResponse() throws Exception { + // test request timestamp response returns a timestamp + + // Program the TSO to return an ad-hoc Timestamp response + tsoServer.queueResponse(new TimestampResponse(START_TS)); + + long startTS = tsoClient.getNewStartTimestamp().get(); + assertEquals(startTS, START_TS); + } + + @Test + public void testCommitRequestReceivingAnAbortResponse() throws Exception { + // test commit request which is aborted on the server side + // (e.g. due to conflicts with other transaction) throws an + // execution exception with an AbortException as a cause + + // Program the TSO to return an Abort response + tsoServer.queueResponse(new AbortResponse(START_TS)); + + try { + tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get(); + } catch (ExecutionException ee) { + assertEquals(ee.getCause().getClass(), AbortException.class); + } + } + + @Test + public void testCommitRequestReceivingASuccessfulResponse() throws Exception { + // test commit request which is successfully committed on the server + // side returns a commit timestamp + + // Program the TSO to return an Commit response (with no required heuristic actions) + tsoServer.queueResponse(new CommitResponse(false, START_TS, COMMIT_TS)); + + long commitTS = tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get(); + assertEquals(commitTS, COMMIT_TS); + } + + @Test + public void testCommitRequestReceivingAHeuristicResponse() throws Exception { + // test commit request which needs heuristic actions from the client + // throws an execution exception with a NewTSOException as a cause + + // Program the TSO to return an Commit response requiring heuristic actions + tsoServer.queueResponse(new CommitResponse(true, START_TS, COMMIT_TS)); + try { + tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get(); + } catch (ExecutionException ee) { + assertEquals(ee.getCause().getClass(), NewTSOException.class); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/client/TestUnconnectedTSOClient.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestUnconnectedTSOClient.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestUnconnectedTSOClient.java new file mode 100644 index 0000000..883d45f --- /dev/null +++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestUnconnectedTSOClient.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.tso.client; + +import org.apache.omid.tso.util.DummyCellIdImpl; +import org.apache.statemachine.StateMachine.FsmImpl; +import org.slf4j.Logger; +import org.testng.annotations.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static com.google.common.collect.Sets.newHashSet; +import static org.slf4j.LoggerFactory.getLogger; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +/** + * Test the behavior of requests on a TSOClient component that is not connected to a TSO server. + */ +public class TestUnconnectedTSOClient { + + private static final Logger LOG = getLogger(TestUnconnectedTSOClient.class); + + private static final int TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST = 2; + + @Test(timeOut = 30_000) // 30 secs + public void testRequestsDoneOnAnUnconnectedTSOClientAlwaysReturn() throws Exception { + + OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); + tsoClientConf.setConnectionString("localhost:12345"); + tsoClientConf.setReconnectionDelayInSecs(TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST); + + // Component under test + TSOClient tsoClient = TSOClient.newInstance(tsoClientConf); + + // Internal accessor to fsm + FsmImpl fsm = (FsmImpl) tsoClient.fsm; + + assertEquals(fsm.getState().getClass(), TSOClient.DisconnectedState.class); + + // Test requests to the 3 relevant methods in TSO client + + try { + tsoClient.getNewStartTimestamp().get(); + fail(); + } catch (ExecutionException e) { + LOG.info("Exception expected"); + assertEquals(e.getCause().getClass(), ConnectionException.class); + TimeUnit.SECONDS.sleep(TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST * 2); + assertEquals(fsm.getState().getClass(), TSOClient.DisconnectedState.class); + } + + try { + tsoClient.commit(1, newHashSet(new DummyCellIdImpl(0xdeadbeefL))).get(); + fail(); + } catch (ExecutionException e) { + LOG.info("Exception expected"); + assertEquals(e.getCause().getClass(), ConnectionException.class); + TimeUnit.SECONDS.sleep(TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST * 2); + assertEquals(fsm.getState().getClass(), TSOClient.DisconnectedState.class); + } + + tsoClient.close().get(); + LOG.info("No exception expected"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/tso-server/src/test/resources/log4j.properties b/tso-server/src/test/resources/log4j.properties index 8273243..5f7911e 100644 --- a/tso-server/src/test/resources/log4j.properties +++ b/tso-server/src/test/resources/log4j.properties @@ -40,14 +40,13 @@ log4j.logger.org.apache.zookeeper=ERROR log4j.logger.org.apache.bookkeeper=FATAL log4j.logger.org.apache.hadoop.hbase=ERROR log4j.logger.org.apache.hadoop.ipc=ERROR - -log4j.logger.com.yahoo.omid=INFO -#log4j.logger.com.yahoo.omid.regionserver.TransactionalRegionServer=TRACE -#log4j.logger.com.yahoo.omid.TestBasicTransaction=TRACE -#log4j.logger.com.yahoo.omid.client.TSOClient=TRACE -#log4j.logger.com.yahoo.omid.client.TransactionState=TRACE -#log4j.logger.com.yahoo.omid.OmidTestBase=TRACE -#log4j.logger.com.yahoo.omid.tso.ThroughputMonitor=INFO +log4j.logger.org.apache.omid=INFO +#log4j.logger.org.apache.omid.regionserver.TransactionalRegionServer=TRACE +#log4j.logger.org.apache.omid.TestBasicTransaction=TRACE +#log4j.logger.org.apache.omid.client.TSOClient=TRACE +#log4j.logger.org.apache.omid.client.TransactionState=TRACE +#log4j.logger.org.apache.omid.OmidTestBase=TRACE +#log4j.logger.org.apache.omid.tso.ThroughputMonitor=INFO #log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG # Make these two classes INFO-level. Make them DEBUG to see more zk debug. http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/resources/test-omid.yml ---------------------------------------------------------------------- diff --git a/tso-server/src/test/resources/test-omid.yml b/tso-server/src/test/resources/test-omid.yml index 9db7bd6..4729bb1 100644 --- a/tso-server/src/test/resources/test-omid.yml +++ b/tso-server/src/test/resources/test-omid.yml @@ -9,13 +9,13 @@ maxBatchSize: 500 batchPersistTimeoutInMs: 100 networkIfaceName: eth1 -commitTableStoreModule: !!com.yahoo.omid.committable.hbase.DefaultHBaseCommitTableStorageModule +commitTableStoreModule: !!org.apache.omid.committable.hbase.DefaultHBaseCommitTableStorageModule tableName: "sieve_omid:OMID_TIMESTAMP_F" -timestampStoreModule: !!com.yahoo.omid.timestamp.storage.DefaultHBaseTimestampStorageModule +timestampStoreModule: !!org.apache.omid.timestamp.storage.DefaultHBaseTimestampStorageModule tableName: "sieve_omid:OMID_COMMIT_TABLE_F" familyName: "MAX_TIMESTAMP_F" -leaseModule: !!com.yahoo.omid.tso.VoidLeaseManagementModule [ ] +leaseModule: !!org.apache.omid.tso.VoidLeaseManagementModule [ ] -metrics: !!com.yahoo.omid.metrics.NullMetricsProvider [ ] \ No newline at end of file +metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ] \ No newline at end of file