http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestTSOClientConnectionToTSO.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestTSOClientConnectionToTSO.java b/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestTSOClientConnectionToTSO.java deleted file mode 100644 index d41517c..0000000 --- a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestTSOClientConnectionToTSO.java +++ /dev/null @@ -1,285 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yahoo.omid.tsoclient; - -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.yahoo.omid.TestUtils; -import com.yahoo.omid.tso.HALeaseManagementModule; -import com.yahoo.omid.tso.TSOMockModule; -import com.yahoo.omid.tso.TSOServer; -import com.yahoo.omid.tso.TSOServerConfig; -import com.yahoo.omid.tso.VoidLeaseManagementModule; -import com.yahoo.statemachine.StateMachine.FsmImpl; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.test.TestingServer; -import org.apache.curator.utils.CloseableUtils; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.concurrent.ExecutionException; - -import static com.yahoo.omid.tsoclient.OmidClientConfiguration.ConnType.HA; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -public class TestTSOClientConnectionToTSO { - - private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientConnectionToTSO.class); - - // Constants and variables for component connectivity - private static final String TSO_HOST = "localhost"; - private static final String CURRENT_TSO_PATH = "/current_tso_path"; - private static final String TSO_LEASE_PATH = "/tso_lease_path"; - - private int tsoPortForTest; - private String zkClusterForTest; - - private Injector injector = null; - - private TestingServer zkServer; - - private CuratorFramework zkClient; - private TSOServer tsoServer; - - @BeforeMethod - public void beforeMethod() throws Exception { - - tsoPortForTest = TestUtils.getFreeLocalPort(); - - int zkPortForTest = TestUtils.getFreeLocalPort(); - zkClusterForTest = TSO_HOST + ":" + zkPortForTest; - LOG.info("Starting ZK Server in port {}", zkPortForTest); - zkServer = TestUtils.provideTestingZKServer(zkPortForTest); - LOG.info("ZK Server Started @ {}", zkServer.getConnectString()); - - zkClient = TestUtils.provideConnectedZKClient(zkClusterForTest); - - Stat stat; - try { - zkClient.delete().forPath(CURRENT_TSO_PATH); - stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH); - assertNull(stat, CURRENT_TSO_PATH + " should not exist"); - } catch (NoNodeException e) { - LOG.info("{} ZNode did not exist", CURRENT_TSO_PATH); - } - - } - - @AfterMethod - public void afterMethod() { - - zkClient.close(); - - CloseableUtils.closeQuietly(zkServer); - zkServer = null; - LOG.info("ZK Server Stopped"); - - } - - @Test(timeOut = 30_000) - public void testUnsuccessfulConnectionToTSO() throws Exception { - - // When no HA node for TSOServer is found & no host:port config exists - // we should get an exception when getting the client - try { - TSOClient.newInstance(new OmidClientConfiguration()); - } catch (IllegalArgumentException e) { - // Expected - } - - } - - @Test(timeOut = 30_000) - public void testSuccessfulConnectionToTSOWithHostAndPort() throws Exception { - - // Launch a TSO WITHOUT publishing the address in HA... - TSOServerConfig tsoConfig = new TSOServerConfig(); - tsoConfig.setMaxItems(1000); - tsoConfig.setPort(tsoPortForTest); - tsoConfig.setLeaseModule(new VoidLeaseManagementModule()); - injector = Guice.createInjector(new TSOMockModule(tsoConfig)); - LOG.info("Starting TSO"); - tsoServer = injector.getInstance(TSOServer.class); - tsoServer.startAndWait(); - TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100); - LOG.info("Finished loading TSO"); - - // When no HA node for TSOServer is found we should get a connection - // to the TSO through the host:port configured... - OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); - tsoClientConf.setConnectionString("localhost:" + tsoPortForTest); - tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH); - TSOClient tsoClient = TSOClient.newInstance(tsoClientConf); - - // ... so we should get responses from the methods - Long startTS = tsoClient.getNewStartTimestamp().get(); - LOG.info("Start TS {} ", startTS); - assertEquals(startTS.longValue(), 1); - - // Close the tsoClient connection and stop the TSO Server - tsoClient.close().get(); - tsoServer.stopAndWait(); - tsoServer = null; - TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000); - LOG.info("TSO Server Stopped"); - - } - - @Test(timeOut = 30_000) - public void testSuccessfulConnectionToTSOThroughZK() throws Exception { - - // Launch a TSO publishing the address in HA... - TSOServerConfig config = new TSOServerConfig(); - config.setMaxItems(1000); - config.setPort(tsoPortForTest); - config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid")); - injector = Guice.createInjector(new TSOMockModule(config)); - LOG.info("Starting TSO"); - tsoServer = injector.getInstance(TSOServer.class); - tsoServer.startAndWait(); - TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100); - LOG.info("Finished loading TSO"); - - waitTillTsoRegisters(injector.getInstance(CuratorFramework.class)); - - // When a HA node for TSOServer is found we should get a connection - OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); - tsoClientConf.setConnectionType(HA); - tsoClientConf.setConnectionString(zkClusterForTest); - tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH); - TSOClient tsoClient = TSOClient.newInstance(tsoClientConf); - - // ... so we should get responses from the methods - Long startTS = tsoClient.getNewStartTimestamp().get(); - LOG.info("Start TS {} ", startTS); - assertEquals(startTS.longValue(), 1); - - // Close the tsoClient connection and stop the TSO Server - tsoClient.close().get(); - tsoServer.stopAndWait(); - tsoServer = null; - TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000); - LOG.info("TSO Server Stopped"); - - } - - @Test(timeOut = 30_000) - public void testSuccessOfTSOClientReconnectionsToARestartedTSOWithZKPublishing() throws Exception { - - // Start a TSO with HA... - TSOServerConfig config = new TSOServerConfig(); - config.setMaxItems(1000); - config.setPort(tsoPortForTest); - config.setLeaseModule(new HALeaseManagementModule(1000, TSO_LEASE_PATH, CURRENT_TSO_PATH, zkClusterForTest, "omid")); - injector = Guice.createInjector(new TSOMockModule(config)); - LOG.info("Starting Initial TSO"); - tsoServer = injector.getInstance(TSOServer.class); - tsoServer.startAndWait(); - TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100); - LOG.info("Finished loading TSO"); - - waitTillTsoRegisters(injector.getInstance(CuratorFramework.class)); - - // Then create the TSO Client under test... - OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); - tsoClientConf.setConnectionType(HA); - tsoClientConf.setConnectionString(zkClusterForTest); - tsoClientConf.setZkCurrentTsoPath(CURRENT_TSO_PATH); - TSOClient tsoClient = TSOClient.newInstance(tsoClientConf); - - // ... and check that initially we get responses from the methods - Long startTS = tsoClient.getNewStartTimestamp().get(); - LOG.info("Start TS {} ", startTS); - assertEquals(startTS.longValue(), 1); - - // Then stop the server... - tsoServer.stopAndWait(); - tsoServer = null; - TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000); - LOG.info("Initial TSO Server Stopped"); - - Thread.sleep(1500); // ...allow the client to receive disconnection event... - // ... and check that we get a conn exception when trying to access the client - try { - startTS = tsoClient.getNewStartTimestamp().get(); - fail(); - } catch (ExecutionException e) { - LOG.info("Exception expected"); - // Internal accessor to fsm to do the required checkings - FsmImpl fsm = (FsmImpl) tsoClient.fsm; - assertEquals(e.getCause().getClass(), ConnectionException.class); - assertTrue(fsm.getState().getClass().equals(TSOClient.ConnectionFailedState.class) - || - fsm.getState().getClass().equals(TSOClient.DisconnectedState.class)); - } - - // After that, simulate that a new TSO has been launched... - Injector newInjector = Guice.createInjector(new TSOMockModule(config)); - LOG.info("Re-Starting again the TSO"); - tsoServer = newInjector.getInstance(TSOServer.class); - tsoServer.startAndWait(); - TestUtils.waitForSocketListening(TSO_HOST, tsoPortForTest, 100); - LOG.info("Finished loading restarted TSO"); - - // Finally re-check that, eventually, we can get a new value from the new TSO... - boolean reconnectionActive = false; - while (!reconnectionActive) { - try { - startTS = tsoClient.getNewStartTimestamp().get(); - reconnectionActive = true; - } catch (ExecutionException e) { - // Expected - } - } - assertNotNull(startTS); - - // ...and stop the server - tsoServer.stopAndWait(); - TestUtils.waitForSocketNotListening(TSO_HOST, tsoPortForTest, 1000); - LOG.info("Restarted TSO Server Stopped"); - } - - private void waitTillTsoRegisters(CuratorFramework zkClient) throws Exception { - while (true) { - try { - Stat stat = zkClient.checkExists().forPath(CURRENT_TSO_PATH); - if (stat == null) { - continue; - } - LOG.info("TSO registered in HA with path {}={}", CURRENT_TSO_PATH, stat.toString()); - if (stat.toString().length() == 0) { - continue; - } - return; - } catch (Exception e) { - LOG.debug("TSO still has not registered yet, sleeping...", e); - Thread.sleep(500); - } - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestTSOClientRequestAndResponseBehaviours.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestTSOClientRequestAndResponseBehaviours.java b/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestTSOClientRequestAndResponseBehaviours.java deleted file mode 100644 index 33b12a9..0000000 --- a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestTSOClientRequestAndResponseBehaviours.java +++ /dev/null @@ -1,423 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yahoo.omid.tsoclient; - -import com.google.common.collect.Sets; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Module; -import com.yahoo.omid.TestUtils; -import com.yahoo.omid.committable.CommitTable; -import com.yahoo.omid.proto.TSOProto; -import com.yahoo.omid.tso.PausableTimestampOracle; -import com.yahoo.omid.tso.TSOMockModule; -import com.yahoo.omid.tso.TSOServer; -import com.yahoo.omid.tso.TSOServerConfig; -import com.yahoo.omid.tso.TimestampOracle; -import com.yahoo.omid.tso.util.DummyCellIdImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -public class TestTSOClientRequestAndResponseBehaviours { - - private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRequestAndResponseBehaviours.class); - - private static final String TSO_SERVER_HOST = "localhost"; - private static final int TSO_SERVER_PORT = 1234; - - private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL); - private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL); - - private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2); - - private OmidClientConfiguration tsoClientConf; - - // Required infrastructure for TSOClient test - private TSOServer tsoServer; - private PausableTimestampOracle pausableTSOracle; - private CommitTable commitTable; - - @BeforeClass - public void setup() throws Exception { - - TSOServerConfig tsoConfig = new TSOServerConfig(); - tsoConfig.setMaxItems(1000); - tsoConfig.setPort(TSO_SERVER_PORT); - Module tsoServerMockModule = new TSOMockModule(tsoConfig); - Injector injector = Guice.createInjector(tsoServerMockModule); - - LOG.info("=================================================================================================="); - LOG.info("======================================= Init TSO Server =========================================="); - LOG.info("=================================================================================================="); - - tsoServer = injector.getInstance(TSOServer.class); - tsoServer.startAndWait(); - TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100); - - LOG.info("=================================================================================================="); - LOG.info("===================================== TSO Server Initialized ====================================="); - LOG.info("=================================================================================================="); - - pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class); - commitTable = injector.getInstance(CommitTable.class); - - } - - @AfterClass - public void tearDown() throws Exception { - - tsoServer.stopAndWait(); - tsoServer = null; - TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000); - - } - - @BeforeMethod - public void beforeMethod() { - OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); - tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT); - - this.tsoClientConf = tsoClientConf; - - } - - @AfterMethod - public void afterMethod() { - - pausableTSOracle.resume(); - - } - - /** - * Test to ensure TSOClient timeouts are cancelled. - * At some point a bug was detected because the TSOClient timeouts were not cancelled, and as timestamp requests - * had no way to be correlated to timestamp responses, random requests were just timed out after a certain time. - * We send a lot of timestamp requests, and wait for them to complete. - * Ensure that the next request doesn't get hit by the timeouts of the previous - * requests. (i.e. make sure we cancel timeouts) - */ - @Test(timeOut = 30_000) - public void testTimeoutsAreCancelled() throws Exception { - - TSOClient client = TSOClient.newInstance(tsoClientConf); - int requestTimeoutInMs = 500; - int requestMaxRetries = 5; - LOG.info("Request timeout {} ms; Max retries {}", requestTimeoutInMs, requestMaxRetries); - Future<Long> f = null; - for (int i = 0; i < (requestMaxRetries * 10); i++) { - f = client.getNewStartTimestamp(); - } - if (f != null) { - f.get(); - } - pausableTSOracle.pause(); - long msToSleep = ((long) (requestTimeoutInMs * 0.75)); - LOG.info("Sleeping for {} ms", msToSleep); - TimeUnit.MILLISECONDS.sleep(msToSleep); - f = client.getNewStartTimestamp(); - msToSleep = ((long) (requestTimeoutInMs * 0.9)); - LOG.info("Sleeping for {} ms", msToSleep); - TimeUnit.MILLISECONDS.sleep(msToSleep); - LOG.info("Resuming"); - pausableTSOracle.resume(); - f.get(); - - } - - @Test(timeOut = 30_000) - public void testCommitGetsServiceUnavailableExceptionWhenCommunicationFails() throws Exception { - - OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration(); - testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT); - testTSOClientConf.setRequestMaxRetries(0); - TSOClient client = TSOClient.newInstance(testTSOClientConf); - - List<Long> startTimestamps = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - startTimestamps.add(client.getNewStartTimestamp().get()); - } - - pausableTSOracle.pause(); - - List<Future<Long>> futures = new ArrayList<>(); - for (long s : startTimestamps) { - futures.add(client.commit(s, Sets.<CellId>newHashSet())); - } - TSOClientAccessor.closeChannel(client); - - for (Future<Long> f : futures) { - try { - f.get(); - fail("Shouldn't be able to complete"); - } catch (ExecutionException ee) { - assertTrue(ee.getCause() instanceof ServiceUnavailableException, - "Should be a service unavailable exception"); - } - } - } - - /** - * Test that if a client tries to make a request without handshaking, it will be disconnected. - */ - @Test(timeOut = 30_000) - public void testHandshakeBetweenOldClientAndCurrentServer() throws Exception { - - TSOClientRaw raw = new TSOClientRaw(TSO_SERVER_HOST, TSO_SERVER_PORT); - - TSOProto.Request request = TSOProto.Request.newBuilder() - .setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build()) - .build(); - raw.write(request); - try { - raw.getResponse().get(); - fail("Channel should be closed"); - } catch (ExecutionException ee) { - assertEquals(ee.getCause().getClass(), ConnectionException.class, "Should be channel closed exception"); - } - raw.close(); - - } - - // ---------------------------------------------------------------------------------------------------------------- - // Test duplicate commits - // ---------------------------------------------------------------------------------------------------------------- - - /** - * This tests the case where messages arrive at the TSO out of order. This can happen in the case - * the channel get dropped and the retry is done in a new channel. However, the TSO will respond with - * aborted to the original message because the retry was already committed and it would be prohibitively - * expensive to check all non-retry requests to see if they are already committed. For this reason - * a client must ensure that if it is sending a retry due to a socket error, the previous channel - * must be entirely closed so that it will not actually receive the abort response. TCP guarantees - * that this doesn't happen in non-socket error cases. - * - */ - @Test(timeOut = 30_000) - public void testOutOfOrderMessages() throws Exception { - - TSOClient client = TSOClient.newInstance(tsoClientConf); - TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT); - - long ts1 = client.getNewStartTimestamp().get(); - - TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet)); - TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet)); - assertFalse(response1.getCommitResponse().getAborted(), "Retry Transaction should commit"); - assertTrue(response2.getCommitResponse().getAborted(), "Transaction should abort"); - } - - @Test(timeOut = 30_000) - public void testDuplicateCommitAborting() throws Exception { - - TSOClient client = TSOClient.newInstance(tsoClientConf); - TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT); - - long ts1 = client.getNewStartTimestamp().get(); - long ts2 = client.getNewStartTimestamp().get(); - client.commit(ts2, testWriteSet).get(); - - TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet)); - TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet)); - assertTrue(response1.getCommitResponse().getAborted(), "Transaction should abort"); - assertTrue(response2.getCommitResponse().getAborted(), "Retry commit should abort"); - } - - @Test(timeOut = 30_000) - public void testDuplicateCommit() throws Exception { - - TSOClient client = TSOClient.newInstance(tsoClientConf); - TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT); - - long ts1 = client.getNewStartTimestamp().get(); - - TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet)); - TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet)); - assertEquals(response2.getCommitResponse().getCommitTimestamp(), - response1.getCommitResponse().getCommitTimestamp(), - "Commit timestamp should be the same"); - } - - // ---------------------------------------------------------------------------------------------------------------- - // Test TSOClient retry behaviour - // ---------------------------------------------------------------------------------------------------------------- - - @Test(timeOut = 30_000) - public void testCommitCanSucceedWhenChannelDisconnected() throws Exception { - - TSOClient client = TSOClient.newInstance(tsoClientConf); - - long ts1 = client.getNewStartTimestamp().get(); - pausableTSOracle.pause(); - TSOFuture<Long> future = client.commit(ts1, testWriteSet); - TSOClientAccessor.closeChannel(client); - pausableTSOracle.resume(); - future.get(); - - } - - @Test(timeOut = 30_000) - public void testCommitCanSucceedWithMultipleTimeouts() throws Exception { - - OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration(); - testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT); - testTSOClientConf.setRequestTimeoutInMs(100); - testTSOClientConf.setRequestMaxRetries(10000); - TSOClient client = TSOClient.newInstance(testTSOClientConf); - - long ts1 = client.getNewStartTimestamp().get(); - pausableTSOracle.pause(); - TSOFuture<Long> future = client.commit(ts1, testWriteSet); - TimeUnit.SECONDS.sleep(1); - pausableTSOracle.resume(); - future.get(); - } - - @Test(timeOut = 30_000) - public void testCommitFailWhenTSOIsDown() throws Exception { - - OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration(); - testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT); - testTSOClientConf.setRequestTimeoutInMs(100); - testTSOClientConf.setRequestMaxRetries(10); - TSOClient client = TSOClient.newInstance(testTSOClientConf); - - long ts1 = client.getNewStartTimestamp().get(); - pausableTSOracle.pause(); - TSOFuture<Long> future = client.commit(ts1, testWriteSet); - try { - future.get(); - } catch (ExecutionException e) { - assertEquals(e.getCause().getClass(), ServiceUnavailableException.class, - "Should be a ServiceUnavailableExeption"); - } - - } - - @Test(timeOut = 30_000) - public void testTimestampRequestSucceedWithMultipleTimeouts() throws Exception { - - OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration(); - testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT); - testTSOClientConf.setRequestTimeoutInMs(100); - testTSOClientConf.setRequestMaxRetries(10000); - TSOClient client = TSOClient.newInstance(testTSOClientConf); - - pausableTSOracle.pause(); - Future<Long> future = client.getNewStartTimestamp(); - TimeUnit.SECONDS.sleep(1); - pausableTSOracle.resume(); - future.get(); - - } - - // ---------------------------------------------------------------------------------------------------------------- - // The next 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side - // (They exercise the communication protocol) TODO Remove??? - // ---------------------------------------------------------------------------------------------------------------- - @Test - public void testCommitTimestampPresentInCommitTableReturnsCommit() throws Exception { - - TSOClient client = TSOClient.newInstance(tsoClientConf); - TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT); - - long tx1ST = client.getNewStartTimestamp().get(); - - clientOneShot.makeRequest(createRetryCommitRequest(tx1ST)); - TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST)); - assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed"); - assertFalse(response.getCommitResponse().getMakeHeuristicDecision()); - assertEquals(response.getCommitResponse().getCommitTimestamp(), tx1ST + 1); - } - - @Test - public void testInvalidCommitTimestampPresentInCommitTableReturnsAbort() throws Exception { - - TSOClient client = TSOClient.newInstance(tsoClientConf); - TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT); - - long tx1ST = client.getNewStartTimestamp().get(); - // Invalidate the transaction - commitTable.getClient().tryInvalidateTransaction(tx1ST); - - clientOneShot.makeRequest(createRetryCommitRequest(tx1ST)); - TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST)); - assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted"); - assertFalse(response.getCommitResponse().getMakeHeuristicDecision()); - assertEquals(response.getCommitResponse().getCommitTimestamp(), 0); - } - - @Test - public void testCommitTimestampNotPresentInCommitTableReturnsAnAbort() throws Exception { - - TSOClient client = TSOClient.newInstance(tsoClientConf); - TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT); - - long tx1ST = client.getNewStartTimestamp().get(); - - clientOneShot.makeRequest(createRetryCommitRequest(tx1ST)); - - // Simulate remove entry from the commit table before exercise retry - commitTable.getClient().completeTransaction(tx1ST); - - TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST)); - assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort"); - assertFalse(response.getCommitResponse().getMakeHeuristicDecision()); - assertEquals(response.getCommitResponse().getCommitTimestamp(), 0); - } - // ---------------------------------------------------------------------------------------------------------------- - // The previous 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side - // (They exercise the communication protocol) TODO Remove??? - // ---------------------------------------------------------------------------------------------------------------- - - // ---------------------------------------------------------------------------------------------------------------- - // Helper methods - // ---------------------------------------------------------------------------------------------------------------- - - private TSOProto.Request createRetryCommitRequest(long ts) { - return createCommitRequest(ts, true, testWriteSet); - } - - private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) { - TSOProto.Request.Builder builder = TSOProto.Request.newBuilder(); - TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder(); - commitBuilder.setStartTimestamp(ts); - commitBuilder.setIsRetry(retry); - for (CellId cell : writeSet) { - commitBuilder.addCellId(cell.getCellId()); - } - return builder.setCommitRequest(commitBuilder.build()).build(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestTSOClientResponseHandling.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestTSOClientResponseHandling.java b/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestTSOClientResponseHandling.java deleted file mode 100644 index 66c3628..0000000 --- a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestTSOClientResponseHandling.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yahoo.omid.tsoclient; - -import com.yahoo.omid.tso.ProgrammableTSOServer; -import com.yahoo.omid.tso.ProgrammableTSOServer.AbortResponse; -import com.yahoo.omid.tso.ProgrammableTSOServer.CommitResponse; -import com.yahoo.omid.tso.ProgrammableTSOServer.TimestampResponse; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.util.Collections; -import java.util.concurrent.ExecutionException; - -import static org.testng.Assert.assertEquals; - -public class TestTSOClientResponseHandling { - - private static final int TSO_PORT = 4321; - private static final long START_TS = 1L; - private static final long COMMIT_TS = 2L; - - private ProgrammableTSOServer tsoServer = new ProgrammableTSOServer(TSO_PORT); - // Client under test - private TSOClient tsoClient; - - @BeforeClass - public void configureAndCreateClient() throws IOException, InterruptedException { - - OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); - tsoClientConf.setConnectionString("localhost:" + TSO_PORT); - tsoClient = TSOClient.newInstance(tsoClientConf); - } - - @BeforeMethod - public void reset() { - tsoServer.cleanResponses(); - } - - @Test - public void testTimestampRequestReceivingASuccessfulResponse() throws Exception { - // test request timestamp response returns a timestamp - - // Program the TSO to return an ad-hoc Timestamp response - tsoServer.queueResponse(new TimestampResponse(START_TS)); - - long startTS = tsoClient.getNewStartTimestamp().get(); - assertEquals(startTS, START_TS); - } - - @Test - public void testCommitRequestReceivingAnAbortResponse() throws Exception { - // test commit request which is aborted on the server side - // (e.g. due to conflicts with other transaction) throws an - // execution exception with an AbortException as a cause - - // Program the TSO to return an Abort response - tsoServer.queueResponse(new AbortResponse(START_TS)); - - try { - tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get(); - } catch (ExecutionException ee) { - assertEquals(ee.getCause().getClass(), AbortException.class); - } - } - - @Test - public void testCommitRequestReceivingASuccessfulResponse() throws Exception { - // test commit request which is successfully committed on the server - // side returns a commit timestamp - - // Program the TSO to return an Commit response (with no required heuristic actions) - tsoServer.queueResponse(new CommitResponse(false, START_TS, COMMIT_TS)); - - long commitTS = tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get(); - assertEquals(commitTS, COMMIT_TS); - } - - @Test - public void testCommitRequestReceivingAHeuristicResponse() throws Exception { - // test commit request which needs heuristic actions from the client - // throws an execution exception with a NewTSOException as a cause - - // Program the TSO to return an Commit response requiring heuristic actions - tsoServer.queueResponse(new CommitResponse(true, START_TS, COMMIT_TS)); - try { - tsoClient.commit(START_TS, Collections.<CellId>emptySet()).get(); - } catch (ExecutionException ee) { - assertEquals(ee.getCause().getClass(), NewTSOException.class); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestUnconnectedTSOClient.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestUnconnectedTSOClient.java b/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestUnconnectedTSOClient.java deleted file mode 100644 index 641ab0a..0000000 --- a/tso-server/src/test/java/com/yahoo/omid/tsoclient/TestUnconnectedTSOClient.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yahoo.omid.tsoclient; - -import com.yahoo.omid.tso.util.DummyCellIdImpl; -import com.yahoo.omid.tsoclient.TSOClient.DisconnectedState; -import com.yahoo.statemachine.StateMachine.FsmImpl; -import org.slf4j.Logger; -import org.testng.annotations.Test; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import static com.google.common.collect.Sets.newHashSet; -import static org.slf4j.LoggerFactory.getLogger; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.fail; - -/** - * Test the behavior of requests on a TSOClient component that is not connected to a TSO server. - */ -public class TestUnconnectedTSOClient { - - private static final Logger LOG = getLogger(TestUnconnectedTSOClient.class); - - private static final int TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST = 2; - - @Test(timeOut = 30_000) // 30 secs - public void testRequestsDoneOnAnUnconnectedTSOClientAlwaysReturn() throws Exception { - - OmidClientConfiguration tsoClientConf = new OmidClientConfiguration(); - tsoClientConf.setConnectionString("localhost:12345"); - tsoClientConf.setReconnectionDelayInSecs(TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST); - - // Component under test - TSOClient tsoClient = TSOClient.newInstance(tsoClientConf); - - // Internal accessor to fsm - FsmImpl fsm = (FsmImpl) tsoClient.fsm; - - assertEquals(fsm.getState().getClass(), DisconnectedState.class); - - // Test requests to the 3 relevant methods in TSO client - - try { - tsoClient.getNewStartTimestamp().get(); - fail(); - } catch (ExecutionException e) { - LOG.info("Exception expected"); - assertEquals(e.getCause().getClass(), ConnectionException.class); - TimeUnit.SECONDS.sleep(TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST * 2); - assertEquals(fsm.getState().getClass(), DisconnectedState.class); - } - - try { - tsoClient.commit(1, newHashSet(new DummyCellIdImpl(0xdeadbeefL))).get(); - fail(); - } catch (ExecutionException e) { - LOG.info("Exception expected"); - assertEquals(e.getCause().getClass(), ConnectionException.class); - TimeUnit.SECONDS.sleep(TSO_RECONNECTION_DELAY_IN_SECS_FOR_TEST * 2); - assertEquals(fsm.getState().getClass(), DisconnectedState.class); - } - - tsoClient.close().get(); - LOG.info("No exception expected"); - } - -}