http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/distributed/ElasticPoolTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/distributed/ElasticPoolTest.java b/tephra-core/src/test/java/co/cask/tephra/distributed/ElasticPoolTest.java deleted file mode 100644 index 40354cf..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/distributed/ElasticPoolTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import com.google.common.base.Throwables; -import org.junit.Assert; -import org.junit.Test; - -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Tests for {@link ElasticPool}. - */ -public class ElasticPoolTest { - - static class Dummy { - static AtomicInteger count = new AtomicInteger(0); - boolean valid = true; - Dummy() { - count.incrementAndGet(); - } - void markInvalid() { - valid = false; - } - - public boolean isValid() { - return valid; - } - } - - class DummyPool extends ElasticPool<Dummy, RuntimeException> { - - public DummyPool(int sizeLimit) { - super(sizeLimit); - } - - @Override - protected Dummy create() { - return new Dummy(); - } - - @Override - protected boolean recycle(Dummy element) { - return element.isValid(); - } - } - - @Test(timeout = 5000) - public void testFewerThreadsThanElements() throws InterruptedException { - final DummyPool pool = new DummyPool(5); - Dummy.count.set(0); - createAndRunThreads(2, pool, false); - // we only ran 2 threads, so only 2 elements got created, even though pool size is 5 - Assert.assertEquals(2, Dummy.count.get()); - } - - @Test(timeout = 5000) - public void testMoreThreadsThanElements() throws InterruptedException { - final DummyPool pool = new DummyPool(2); - Dummy.count.set(0); - createAndRunThreads(5, pool, false); - // even though we ran 5 threads, only 2 elements got created because pool size is 2 - Assert.assertEquals(2, Dummy.count.get()); - } - - @Test(timeout = 5000) - public void testMoreThreadsThanElementsWithDiscard() throws InterruptedException { - final DummyPool pool = new DummyPool(2); - Dummy.count.set(0); - int numThreads = 3; - // pass 'true' as the last parameter, which results in the elements being discarded after each obtain() call. - createAndRunThreads(numThreads, pool, true); - // this results in (5 * numThreads) number of elements being created since each thread obtains a client 5 times. - Assert.assertEquals(5 * numThreads, Dummy.count.get()); - } - - // Creates a list of threads which obtain a client from the pool, sleeps for a certain amount of time, and then - // releases the client back to the pool, optionally marking it invalid before doing so. It repeats this five times. - // Then, runs these threads to completion. - private void createAndRunThreads(int numThreads, final DummyPool pool, - final boolean discardAtEnd) throws InterruptedException { - Thread[] threads = new Thread[numThreads]; - for (int i = 0; i < numThreads; i++) { - threads[i] = new Thread() { - @Override - public void run() { - for (int j = 0; j < 5; ++j) { - Dummy dummy; - try { - dummy = pool.obtain(); - } catch (InterruptedException e) { - throw Throwables.propagate(e); - } - try { - Thread.sleep(10L); - } catch (InterruptedException e) { - // ignored - } - if (discardAtEnd) { - dummy.markInvalid(); - } - pool.release(dummy); - } - } - }; - } - for (Thread t : threads) { - t.start(); - } - for (Thread t : threads) { - t.join(); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/distributed/PooledClientProviderTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/distributed/PooledClientProviderTest.java b/tephra-core/src/test/java/co/cask/tephra/distributed/PooledClientProviderTest.java deleted file mode 100644 index 354b5b7..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/distributed/PooledClientProviderTest.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import co.cask.tephra.TransactionServiceMain; -import co.cask.tephra.TxConstants; -import co.cask.tephra.runtime.ConfigModule; -import co.cask.tephra.runtime.DiscoveryModules; -import co.cask.tephra.runtime.TransactionClientModule; -import co.cask.tephra.runtime.TransactionModules; -import co.cask.tephra.runtime.ZKModule; -import com.google.common.base.Throwables; -import com.google.inject.Guice; -import com.google.inject.Injector; -import org.apache.hadoop.conf.Configuration; -import org.apache.twill.discovery.DiscoveryServiceClient; -import org.apache.twill.internal.zookeeper.InMemoryZKServer; -import org.apache.twill.zookeeper.ZKClientService; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeoutException; - -public class PooledClientProviderTest { - - public static final int MAX_CLIENT_COUNT = 3; - public static final long CLIENT_OBTAIN_TIMEOUT = 10; - - @ClassRule - public static TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Test - public void testClientConnectionPoolMaximumNumberOfClients() throws Exception { - // We need a server for the client to connect to - InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); - - try { - Configuration conf = new Configuration(); - conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); - conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); - conf.set("data.tx.client.count", Integer.toString(MAX_CLIENT_COUNT)); - conf.set("data.tx.client.obtain.timeout", Long.toString(CLIENT_OBTAIN_TIMEOUT)); - - final TransactionServiceMain main = new TransactionServiceMain(conf); - final CountDownLatch latch = new CountDownLatch(1); - Thread t = new Thread() { - @Override - public void run() { - try { - main.start(); - latch.countDown(); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - }; - - try { - t.start(); - // Wait for service to startup - latch.await(); - - startClientAndTestPool(conf); - } finally { - main.stop(); - t.join(); - } - } finally { - zkServer.stopAndWait(); - } - } - - private void startClientAndTestPool(Configuration conf) throws Exception { - Injector injector = Guice.createInjector( - new ConfigModule(conf), - new ZKModule(), - new DiscoveryModules().getDistributedModules(), - new TransactionModules().getDistributedModules(), - new TransactionClientModule() - ); - - ZKClientService zkClient = injector.getInstance(ZKClientService.class); - zkClient.startAndWait(); - - final PooledClientProvider clientProvider = new PooledClientProvider(conf, - injector.getInstance(DiscoveryServiceClient.class)); - - // test simple case of get + return. Note: this also initializes the provider's pool, which - // takes about one second (discovery). Doing it before we test the threads makes it so that one - // thread doesn't take exceptionally longer than the others. - try (CloseableThriftClient closeableThriftClient = clientProvider.getCloseableClient()) { - // do nothing with the client - } - - //Now race to get MAX_CLIENT_COUNT+1 clients, exhausting the pool and requesting 1 more. - List<Future<Integer>> clientIds = new ArrayList<Future<Integer>>(); - CountDownLatch countDownLatch = new CountDownLatch(1); - ExecutorService executor = Executors.newFixedThreadPool(MAX_CLIENT_COUNT + 1); - for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) { - clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT / 2, countDownLatch))); - } - countDownLatch.countDown(); - - Set<Integer> ids = new HashSet<Integer>(); - for (Future<Integer> id : clientIds) { - ids.add(id.get()); - } - Assert.assertEquals(MAX_CLIENT_COUNT, ids.size()); - - // now, try it again with, where each thread holds onto the client for twice the client.obtain.timeout value. - // one of the threads should throw a TimeOutException, because the other threads don't release their clients - // within the configured timeout. - countDownLatch = new CountDownLatch(1); - for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) { - clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT * 2, countDownLatch))); - } - countDownLatch.countDown(); - int numTimeoutExceptions = 0; - for (Future<Integer> clientId : clientIds) { - try { - clientId.get(); - } catch (ExecutionException expected) { - Assert.assertEquals(TimeoutException.class, expected.getCause().getClass()); - numTimeoutExceptions++; - } - } - // expect that exactly one of the threads hit the TimeoutException - Assert.assertEquals(String.format("Expected one thread to not obtain a client within %s milliseconds.", - CLIENT_OBTAIN_TIMEOUT), - 1, numTimeoutExceptions); - - executor.shutdown(); - } - - private static class RetrieveClient implements Callable<Integer> { - private final PooledClientProvider pool; - private final long holdClientMs; - private final CountDownLatch begin; - - public RetrieveClient(PooledClientProvider pool, long holdClientMs, - CountDownLatch begin) { - this.pool = pool; - this.holdClientMs = holdClientMs; - this.begin = begin; - } - - @Override - public Integer call() throws Exception { - begin.await(); - try (CloseableThriftClient client = pool.getCloseableClient()) { - int id = System.identityHashCode(client.getThriftClient()); - // "use" the client for a configured amount of milliseconds - Thread.sleep(holdClientMs); - return id; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/distributed/ThriftTransactionServerTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/distributed/ThriftTransactionServerTest.java b/tephra-core/src/test/java/co/cask/tephra/distributed/ThriftTransactionServerTest.java deleted file mode 100644 index 83aad3c..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/distributed/ThriftTransactionServerTest.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import co.cask.tephra.ThriftTransactionSystemTest; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.TxConstants; -import co.cask.tephra.persist.InMemoryTransactionStateStorage; -import co.cask.tephra.persist.TransactionEdit; -import co.cask.tephra.persist.TransactionLog; -import co.cask.tephra.persist.TransactionStateStorage; -import co.cask.tephra.runtime.ConfigModule; -import co.cask.tephra.runtime.DiscoveryModules; -import co.cask.tephra.runtime.TransactionClientModule; -import co.cask.tephra.runtime.TransactionModules; -import co.cask.tephra.runtime.ZKModule; -import com.google.common.util.concurrent.Service; -import com.google.common.util.concurrent.SettableFuture; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Scopes; -import com.google.inject.util.Modules; -import org.apache.hadoop.conf.Configuration; -import org.apache.twill.internal.zookeeper.InMemoryZKServer; -import org.apache.twill.zookeeper.ZKClientService; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -/** - * This tests whether transaction service hangs on stop when heavily loaded - https://issues.cask.co/browse/TEPHRA-132 - */ -public class ThriftTransactionServerTest { - private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class); - - private static InMemoryZKServer zkServer; - private static ZKClientService zkClientService; - private static TransactionService txService; - private static TransactionStateStorage storage; - static Injector injector; - - private static final int NUM_CLIENTS = 17; - private static final CountDownLatch STORAGE_WAIT_LATCH = new CountDownLatch(1); - private static final CountDownLatch CLIENTS_DONE_LATCH = new CountDownLatch(NUM_CLIENTS); - - @ClassRule - public static TemporaryFolder tmpFolder = new TemporaryFolder(); - - @BeforeClass - public static void start() throws Exception { - zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); - - Configuration conf = new Configuration(); - conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); - conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr()); - conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); - conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); - conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT, NUM_CLIENTS); - conf.setLong(TxConstants.Service.CFG_DATA_TX_CLIENT_TIMEOUT, TimeUnit.HOURS.toMillis(1)); - conf.setInt(TxConstants.Service.CFG_DATA_TX_SERVER_IO_THREADS, 2); - conf.setInt(TxConstants.Service.CFG_DATA_TX_SERVER_THREADS, 4); - - injector = Guice.createInjector( - new ConfigModule(conf), - new ZKModule(), - new DiscoveryModules().getDistributedModules(), - Modules.override(new TransactionModules().getDistributedModules()) - .with(new AbstractModule() { - @Override - protected void configure() { - bind(TransactionStateStorage.class).to(SlowTransactionStorage.class).in(Scopes.SINGLETON); - } - }), - new TransactionClientModule() - ); - - zkClientService = injector.getInstance(ZKClientService.class); - zkClientService.startAndWait(); - - // start a tx server - txService = injector.getInstance(TransactionService.class); - storage = injector.getInstance(TransactionStateStorage.class); - try { - LOG.info("Starting transaction service"); - txService.startAndWait(); - } catch (Exception e) { - LOG.error("Failed to start service: ", e); - } - } - - @Before - public void reset() throws Exception { - getClient().resetState(); - } - - @AfterClass - public static void stop() throws Exception { - txService.stopAndWait(); - storage.stopAndWait(); - zkClientService.stopAndWait(); - zkServer.stopAndWait(); - } - - public TransactionSystemClient getClient() throws Exception { - return injector.getInstance(TransactionSystemClient.class); - } - - @Test - public void testThriftServerStop() throws Exception { - int nThreads = NUM_CLIENTS; - ExecutorService executorService = Executors.newFixedThreadPool(nThreads); - for (int i = 0; i < nThreads; ++i) { - executorService.submit(new Runnable() { - @Override - public void run() { - try { - TransactionSystemClient txClient = getClient(); - CLIENTS_DONE_LATCH.countDown(); - txClient.startShort(); - } catch (Exception e) { - // Exception expected - } - } - }); - } - - // Wait till all clients finish sending reqeust to transaction manager - CLIENTS_DONE_LATCH.await(); - TimeUnit.SECONDS.sleep(1); - - // Expire zookeeper session, which causes Thrift server to stop. - expireZkSession(zkClientService); - waitForThriftTermination(); - - // Stop Zookeeper client so that it does not re-connect to Zookeeper and start Thrift sever again. - zkClientService.stopAndWait(); - STORAGE_WAIT_LATCH.countDown(); - TimeUnit.SECONDS.sleep(1); - - // Make sure Thrift server stopped. - Assert.assertEquals(Service.State.TERMINATED, txService.thriftRPCServerState()); - } - - private void expireZkSession(ZKClientService zkClientService) throws Exception { - ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get(); - final SettableFuture<?> connectFuture = SettableFuture.create(); - Watcher watcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - if (event.getState() == Event.KeeperState.SyncConnected) { - connectFuture.set(null); - } - } - }; - - // Create another Zookeeper session with the same sessionId so that the original one expires. - final ZooKeeper dupZookeeper = - new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher, - zooKeeper.getSessionId(), zooKeeper.getSessionPasswd()); - connectFuture.get(30, TimeUnit.SECONDS); - Assert.assertEquals("Failed to re-create current session", dupZookeeper.getState(), ZooKeeper.States.CONNECTED); - dupZookeeper.close(); - } - - private void waitForThriftTermination() throws InterruptedException { - int count = 0; - while (txService.thriftRPCServerState() != Service.State.TERMINATED && count++ < 200) { - TimeUnit.MILLISECONDS.sleep(50); - } - } - - private static class SlowTransactionStorage extends InMemoryTransactionStateStorage { - @Override - public TransactionLog createLog(long timestamp) throws IOException { - return new SlowTransactionLog(timestamp); - } - } - - private static class SlowTransactionLog extends InMemoryTransactionStateStorage.InMemoryTransactionLog { - public SlowTransactionLog(long timestamp) { - super(timestamp); - } - - @Override - public void append(TransactionEdit edit) throws IOException { - try { - STORAGE_WAIT_LATCH.await(); - } catch (InterruptedException e) { - LOG.error("Got exception: ", e); - } - super.append(edit); - } - - @Override - public void append(List<TransactionEdit> edits) throws IOException { - try { - STORAGE_WAIT_LATCH.await(); - } catch (InterruptedException e) { - LOG.error("Got exception: ", e); - } - super.append(edits); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/hbase/AbstractTransactionVisibilityFilterTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/hbase/AbstractTransactionVisibilityFilterTest.java b/tephra-core/src/test/java/co/cask/tephra/hbase/AbstractTransactionVisibilityFilterTest.java deleted file mode 100644 index 6427b07..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/hbase/AbstractTransactionVisibilityFilterTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.hbase; - -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TxConstants; -import co.cask.tephra.util.ConfigurationFactory; -import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; -import org.junit.After; -import org.junit.Before; - -import java.util.List; - -/** - * Common test class for TransactionVisibilityFilter implementations. - */ -public abstract class AbstractTransactionVisibilityFilterTest { - - protected static final byte[] FAM = new byte[] {'f'}; - protected static final byte[] FAM2 = new byte[] {'f', '2'}; - protected static final byte[] FAM3 = new byte[] {'f', '3'}; - protected static final byte[] COL = new byte[] {'c'}; - protected static final List<byte[]> EMPTY_CHANGESET = Lists.newArrayListWithCapacity(0); - - protected TransactionManager txManager; - - @Before - public void setup() throws Exception { - Configuration conf = new ConfigurationFactory().get(); - conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES); - txManager = new TransactionManager(conf); - txManager.startAndWait(); - } - - @After - public void tearDown() throws Exception { - txManager.stopAndWait(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/AbstractTransactionStateStorageTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/AbstractTransactionStateStorageTest.java deleted file mode 100644 index b2bf69c..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/persist/AbstractTransactionStateStorageTest.java +++ /dev/null @@ -1,555 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.ChangeId; -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TransactionType; -import co.cask.tephra.TxConstants; -import co.cask.tephra.metrics.TxMetricsCollector; -import co.cask.tephra.util.TransactionEditUtil; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import it.unimi.dsi.fastutil.longs.LongArrayList; -import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -/** - * Commons tests to run against the {@link TransactionStateStorage} implementations. - */ -public abstract class AbstractTransactionStateStorageTest { - private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionStateStorageTest.class); - private static Random random = new Random(); - - protected abstract Configuration getConfiguration(String testName) throws IOException; - - protected abstract AbstractTransactionStateStorage getStorage(Configuration conf); - - @Test - public void testSnapshotPersistence() throws Exception { - Configuration conf = getConfiguration("testSnapshotPersistence"); - - TransactionSnapshot snapshot = createRandomSnapshot(); - TransactionStateStorage storage = getStorage(conf); - try { - storage.startAndWait(); - storage.writeSnapshot(snapshot); - - TransactionSnapshot readSnapshot = storage.getLatestSnapshot(); - assertNotNull(readSnapshot); - assertEquals(snapshot, readSnapshot); - } finally { - storage.stopAndWait(); - } - } - - @Test - public void testLogWriteAndRead() throws Exception { - Configuration conf = getConfiguration("testLogWriteAndRead"); - - // create some random entries - List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(100); - TransactionStateStorage storage = getStorage(conf); - try { - long now = System.currentTimeMillis(); - storage.startAndWait(); - TransactionLog log = storage.createLog(now); - for (TransactionEdit edit : edits) { - log.append(edit); - } - log.close(); - - Collection<TransactionLog> logsToRead = storage.getLogsSince(now); - // should only be our one log - assertNotNull(logsToRead); - assertEquals(1, logsToRead.size()); - TransactionLogReader logReader = logsToRead.iterator().next().getReader(); - assertNotNull(logReader); - - List<TransactionEdit> readEdits = Lists.newArrayListWithExpectedSize(edits.size()); - TransactionEdit nextEdit; - while ((nextEdit = logReader.next()) != null) { - readEdits.add(nextEdit); - } - logReader.close(); - assertEquals(edits.size(), readEdits.size()); - for (int i = 0; i < edits.size(); i++) { - LOG.info("Checking edit " + i); - assertEquals(edits.get(i), readEdits.get(i)); - } - } finally { - storage.stopAndWait(); - } - } - - @Test - public void testTransactionManagerPersistence() throws Exception { - Configuration conf = getConfiguration("testTransactionManagerPersistence"); - conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread - // start snapshot thread, but with long enough interval so we only get snapshots on shutdown - conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 600); - - TransactionStateStorage storage = null; - TransactionStateStorage storage2 = null; - TransactionStateStorage storage3 = null; - try { - storage = getStorage(conf); - TransactionManager txManager = new TransactionManager - (conf, storage, new TxMetricsCollector()); - txManager.startAndWait(); - - // TODO: replace with new persistence tests - final byte[] a = { 'a' }; - final byte[] b = { 'b' }; - // start a tx1, add a change A and commit - Transaction tx1 = txManager.startShort(); - Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a))); - Assert.assertTrue(txManager.commit(tx1)); - // start a tx2 and add a change B - Transaction tx2 = txManager.startShort(); - Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b))); - // start a tx3 - Transaction tx3 = txManager.startShort(); - // restart - txManager.stopAndWait(); - TransactionSnapshot origState = txManager.getCurrentState(); - LOG.info("Orig state: " + origState); - - Thread.sleep(100); - // starts a new tx manager - storage2 = getStorage(conf); - txManager = new TransactionManager(conf, storage2, new TxMetricsCollector()); - txManager.startAndWait(); - - // check that the reloaded state matches the old - TransactionSnapshot newState = txManager.getCurrentState(); - LOG.info("New state: " + newState); - assertEquals(origState, newState); - - // commit tx2 - Assert.assertTrue(txManager.commit(tx2)); - // start another transaction, must be greater than tx3 - Transaction tx4 = txManager.startShort(); - Assert.assertTrue(tx4.getTransactionId() > tx3.getTransactionId()); - // tx1 must be visble from tx2, but tx3 and tx4 must not - Assert.assertTrue(tx2.isVisible(tx1.getTransactionId())); - Assert.assertFalse(tx2.isVisible(tx3.getTransactionId())); - Assert.assertFalse(tx2.isVisible(tx4.getTransactionId())); - // add same change for tx3 - Assert.assertFalse(txManager.canCommit(tx3, Collections.singleton(b))); - // check visibility with new xaction - Transaction tx5 = txManager.startShort(); - Assert.assertTrue(tx5.isVisible(tx1.getTransactionId())); - Assert.assertTrue(tx5.isVisible(tx2.getTransactionId())); - Assert.assertFalse(tx5.isVisible(tx3.getTransactionId())); - Assert.assertFalse(tx5.isVisible(tx4.getTransactionId())); - // can commit tx3? - txManager.abort(tx3); - txManager.abort(tx4); - txManager.abort(tx5); - // start new tx and verify its exclude list is empty - Transaction tx6 = txManager.startShort(); - Assert.assertFalse(tx6.hasExcludes()); - txManager.abort(tx6); - - // now start 5 x claim size transactions - Transaction tx = txManager.startShort(); - for (int i = 1; i < 50; i++) { - tx = txManager.startShort(); - } - origState = txManager.getCurrentState(); - - Thread.sleep(100); - // simulate crash by starting a new tx manager without a stopAndWait - storage3 = getStorage(conf); - txManager = new TransactionManager(conf, storage3, new TxMetricsCollector()); - txManager.startAndWait(); - - // verify state again matches (this time should include WAL replay) - newState = txManager.getCurrentState(); - assertEquals(origState, newState); - - // get a new transaction and verify it is greater - Transaction txAfter = txManager.startShort(); - Assert.assertTrue(txAfter.getTransactionId() > tx.getTransactionId()); - } finally { - if (storage != null) { - storage.stopAndWait(); - } - if (storage2 != null) { - storage2.stopAndWait(); - } - if (storage3 != null) { - storage3.stopAndWait(); - } - } - } - - /** - * Tests whether the committed set is advanced properly on WAL replay. - */ - @Test - public void testCommittedSetClearing() throws Exception { - Configuration conf = getConfiguration("testCommittedSetClearing"); - conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread - conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 0); // no periodic snapshots - - TransactionStateStorage storage1 = null; - TransactionStateStorage storage2 = null; - try { - storage1 = getStorage(conf); - TransactionManager txManager = new TransactionManager - (conf, storage1, new TxMetricsCollector()); - txManager.startAndWait(); - - // TODO: replace with new persistence tests - final byte[] a = { 'a' }; - final byte[] b = { 'b' }; - // start a tx1, add a change A and commit - Transaction tx1 = txManager.startShort(); - Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a))); - Assert.assertTrue(txManager.commit(tx1)); - // start a tx2 and add a change B - Transaction tx2 = txManager.startShort(); - Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b))); - // start a tx3 - Transaction tx3 = txManager.startShort(); - TransactionSnapshot origState = txManager.getCurrentState(); - LOG.info("Orig state: " + origState); - - // simulate a failure by starting a new tx manager without stopping first - storage2 = getStorage(conf); - txManager = new TransactionManager(conf, storage2, new TxMetricsCollector()); - txManager.startAndWait(); - - // check that the reloaded state matches the old - TransactionSnapshot newState = txManager.getCurrentState(); - LOG.info("New state: " + newState); - assertEquals(origState, newState); - - } finally { - if (storage1 != null) { - storage1.stopAndWait(); - } - if (storage2 != null) { - storage2.stopAndWait(); - } - } - } - - /** - * Tests removal of old snapshots and old transaction logs. - */ - @Test - public void testOldFileRemoval() throws Exception { - Configuration conf = getConfiguration("testOldFileRemoval"); - TransactionStateStorage storage = null; - try { - storage = getStorage(conf); - storage.startAndWait(); - long now = System.currentTimeMillis(); - long writePointer = 1; - Collection<Long> invalid = Lists.newArrayList(); - NavigableMap<Long, TransactionManager.InProgressTx> inprogress = Maps.newTreeMap(); - Map<Long, Set<ChangeId>> committing = Maps.newHashMap(); - Map<Long, Set<ChangeId>> committed = Maps.newHashMap(); - TransactionSnapshot snapshot = new TransactionSnapshot(now, 0, writePointer++, invalid, - inprogress, committing, committed); - TransactionEdit dummyEdit = TransactionEdit.createStarted(1, 0, Long.MAX_VALUE, TransactionType.SHORT); - - // write snapshot 1 - storage.writeSnapshot(snapshot); - TransactionLog log = storage.createLog(now); - log.append(dummyEdit); - log.close(); - - snapshot = new TransactionSnapshot(now + 1, 0, writePointer++, invalid, inprogress, committing, committed); - // write snapshot 2 - storage.writeSnapshot(snapshot); - log = storage.createLog(now + 1); - log.append(dummyEdit); - log.close(); - - snapshot = new TransactionSnapshot(now + 2, 0, writePointer++, invalid, inprogress, committing, committed); - // write snapshot 3 - storage.writeSnapshot(snapshot); - log = storage.createLog(now + 2); - log.append(dummyEdit); - log.close(); - - snapshot = new TransactionSnapshot(now + 3, 0, writePointer++, invalid, inprogress, committing, committed); - // write snapshot 4 - storage.writeSnapshot(snapshot); - log = storage.createLog(now + 3); - log.append(dummyEdit); - log.close(); - - snapshot = new TransactionSnapshot(now + 4, 0, writePointer++, invalid, inprogress, committing, committed); - // write snapshot 5 - storage.writeSnapshot(snapshot); - log = storage.createLog(now + 4); - log.append(dummyEdit); - log.close(); - - snapshot = new TransactionSnapshot(now + 5, 0, writePointer++, invalid, inprogress, committing, committed); - // write snapshot 6 - storage.writeSnapshot(snapshot); - log = storage.createLog(now + 5); - log.append(dummyEdit); - log.close(); - - List<String> allSnapshots = storage.listSnapshots(); - LOG.info("All snapshots: " + allSnapshots); - assertEquals(6, allSnapshots.size()); - List<String> allLogs = storage.listLogs(); - LOG.info("All logs: " + allLogs); - assertEquals(6, allLogs.size()); - - long oldestKept = storage.deleteOldSnapshots(3); - assertEquals(now + 3, oldestKept); - allSnapshots = storage.listSnapshots(); - LOG.info("All snapshots: " + allSnapshots); - assertEquals(3, allSnapshots.size()); - - storage.deleteLogsOlderThan(oldestKept); - allLogs = storage.listLogs(); - LOG.info("All logs: " + allLogs); - assertEquals(3, allLogs.size()); - } finally { - if (storage != null) { - storage.stopAndWait(); - } - } - } - - @Test - public void testLongTxnEditReplay() throws Exception { - Configuration conf = getConfiguration("testLongTxnEditReplay"); - TransactionStateStorage storage = null; - try { - storage = getStorage(conf); - storage.startAndWait(); - - // Create long running txns. Abort one of them, invalidate another, invalidate and abort the last. - long time1 = System.currentTimeMillis(); - long wp1 = time1 * TxConstants.MAX_TX_PER_MS; - TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG); - TransactionEdit edit2 = TransactionEdit.createAborted(wp1, TransactionType.LONG, null); - - long time2 = time1 + 100; - long wp2 = time2 * TxConstants.MAX_TX_PER_MS; - TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 100000, TransactionType.LONG); - TransactionEdit edit4 = TransactionEdit.createInvalid(wp2); - - long time3 = time1 + 200; - long wp3 = time3 * TxConstants.MAX_TX_PER_MS; - TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG); - TransactionEdit edit6 = TransactionEdit.createInvalid(wp3); - TransactionEdit edit7 = TransactionEdit.createAborted(wp3, TransactionType.LONG, null); - - // write transaction edits - TransactionLog log = storage.createLog(time1); - log.append(edit1); - log.append(edit2); - log.append(edit3); - log.append(edit4); - log.append(edit5); - log.append(edit6); - log.append(edit7); - log.close(); - - // Start transaction manager - TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector()); - txm.startAndWait(); - try { - // Verify that all txns are in invalid list. - TransactionSnapshot snapshot1 = txm.getCurrentState(); - Assert.assertEquals(ImmutableList.of(wp1, wp2, wp3), snapshot1.getInvalid()); - Assert.assertEquals(0, snapshot1.getInProgress().size()); - Assert.assertEquals(0, snapshot1.getCommittedChangeSets().size()); - Assert.assertEquals(0, snapshot1.getCommittedChangeSets().size()); - } finally { - txm.stopAndWait(); - } - } finally { - if (storage != null) { - storage.stopAndWait(); - } - } - } - - @Test - public void testTruncateInvalidTxEditReplay() throws Exception { - Configuration conf = getConfiguration("testTruncateInvalidTxEditReplay"); - TransactionStateStorage storage = null; - try { - storage = getStorage(conf); - storage.startAndWait(); - - // Create some txns, and invalidate all of them. - long time1 = System.currentTimeMillis(); - long wp1 = time1 * TxConstants.MAX_TX_PER_MS; - TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG); - TransactionEdit edit2 = TransactionEdit.createInvalid(wp1); - - long time2 = time1 + 100; - long wp2 = time2 * TxConstants.MAX_TX_PER_MS; - TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 10000, TransactionType.SHORT); - TransactionEdit edit4 = TransactionEdit.createInvalid(wp2); - - long time3 = time1 + 2000; - long wp3 = time3 * TxConstants.MAX_TX_PER_MS; - TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG); - TransactionEdit edit6 = TransactionEdit.createInvalid(wp3); - - long time4 = time1 + 2100; - long wp4 = time4 * TxConstants.MAX_TX_PER_MS; - TransactionEdit edit7 = TransactionEdit.createStarted(wp4, wp4 - 10, time4 + 10000, TransactionType.SHORT); - TransactionEdit edit8 = TransactionEdit.createInvalid(wp4); - - // remove wp1 and wp3 from invalid list - TransactionEdit edit9 = TransactionEdit.createTruncateInvalidTx(ImmutableSet.of(wp1, wp3)); - // truncate invalid transactions before time3 - TransactionEdit edit10 = TransactionEdit.createTruncateInvalidTxBefore(time3); - - // write transaction edits - TransactionLog log = storage.createLog(time1); - log.append(edit1); - log.append(edit2); - log.append(edit3); - log.append(edit4); - log.append(edit5); - log.append(edit6); - log.append(edit7); - log.append(edit8); - log.append(edit9); - log.append(edit10); - log.close(); - - // Start transaction manager - TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector()); - txm.startAndWait(); - try { - // Only wp4 should be in invalid list. - TransactionSnapshot snapshot = txm.getCurrentState(); - Assert.assertEquals(ImmutableList.of(wp4), snapshot.getInvalid()); - Assert.assertEquals(0, snapshot.getInProgress().size()); - Assert.assertEquals(0, snapshot.getCommittedChangeSets().size()); - Assert.assertEquals(0, snapshot.getCommittedChangeSets().size()); - } finally { - txm.stopAndWait(); - } - } finally { - if (storage != null) { - storage.stopAndWait(); - } - } - } - - /** - * Generates a new snapshot object with semi-randomly populated values. This does not necessarily accurately - * represent a typical snapshot's distribution of values, as we only set an upper bound on pointer values. - * - * We generate a new snapshot with the contents: - * <ul> - * <li>readPointer = 1M + (random % 1M)</li> - * <li>writePointer = readPointer + 1000</li> - * <li>waterMark = writePointer + 1000</li> - * <li>inProgress = one each for (writePointer - 500)..writePointer, ~ 5% "long" transaction</li> - * <li>invalid = 100 randomly distributed, 0..1M</li> - * <li>committing = one each, (readPointer + 1)..(readPointer + 100)</li> - * <li>committed = one each, (readPointer - 1000)..readPointer</li> - * </ul> - * @return a new snapshot of transaction state. - */ - private TransactionSnapshot createRandomSnapshot() { - // limit readPointer to a reasonable range, but make it > 1M so we can assign enough keys below - long readPointer = (Math.abs(random.nextLong()) % 1000000L) + 1000000L; - long writePointer = readPointer + 1000L; - - // generate in progress -- assume last 500 write pointer values - NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(); - long startPointer = writePointer - 500L; - for (int i = 0; i < 500; i++) { - long currentTime = System.currentTimeMillis(); - // make some "long" transactions - if (i % 20 == 0) { - inProgress.put(startPointer + i, - new TransactionManager.InProgressTx(startPointer - 1, currentTime + TimeUnit.DAYS.toSeconds(1), - TransactionType.LONG)); - } else { - inProgress.put(startPointer + i, - new TransactionManager.InProgressTx(startPointer - 1, currentTime + 300000L, - TransactionType.SHORT)); - } - } - - // make 100 random invalid IDs - LongArrayList invalid = new LongArrayList(); - for (int i = 0; i < 100; i++) { - invalid.add(Math.abs(random.nextLong()) % 1000000L); - } - - // make 100 committing entries, 10 keys each - Map<Long, Set<ChangeId>> committing = Maps.newHashMap(); - for (int i = 0; i < 100; i++) { - committing.put(readPointer + i, generateChangeSet(10)); - } - - // make 1000 committed entries, 10 keys each - long startCommitted = readPointer - 1000L; - NavigableMap<Long, Set<ChangeId>> committed = Maps.newTreeMap(); - for (int i = 0; i < 1000; i++) { - committed.put(startCommitted + i, generateChangeSet(10)); - } - - return new TransactionSnapshot(System.currentTimeMillis(), readPointer, writePointer, - invalid, inProgress, committing, committed); - } - - private Set<ChangeId> generateChangeSet(int numEntries) { - Set<ChangeId> changes = Sets.newHashSet(); - for (int i = 0; i < numEntries; i++) { - byte[] bytes = new byte[8]; - random.nextBytes(bytes); - changes.add(new ChangeId(bytes)); - } - return changes; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/CommitMarkerCodecTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/CommitMarkerCodecTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/CommitMarkerCodecTest.java deleted file mode 100644 index da876fa..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/persist/CommitMarkerCodecTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.TxConstants; -import com.google.common.primitives.Ints; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -/** - * Unit Test for {@link CommitMarkerCodec}. - */ -public class CommitMarkerCodecTest { - - @ClassRule - public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); - private static final String LOG_FILE = "txlog"; - private static final Random RANDOM = new Random(); - - private static MiniDFSCluster dfsCluster; - private static Configuration conf; - private static FileSystem fs; - - @BeforeClass - public static void setupBeforeClass() throws Exception { - Configuration hConf = new Configuration(); - hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath()); - - dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build(); - conf = new Configuration(dfsCluster.getFileSystem().getConf()); - fs = FileSystem.newInstance(FileSystem.getDefaultUri(conf), conf); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - dfsCluster.shutdown(); - } - - @Test - public void testRandomCommitMarkers() throws Exception { - List<Integer> randomInts = new ArrayList<>(); - Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE); - - // Write a bunch of random commit markers - try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class, - LongWritable.class, - SequenceFile.CompressionType.NONE)) { - for (int i = 0; i < 1000; i++) { - int randomNum = RANDOM.nextInt(Integer.MAX_VALUE); - CommitMarkerCodec.writeMarker(writer, randomNum); - randomInts.add(randomNum); - } - writer.hflush(); - writer.hsync(); - } - - // Read the commit markers back to verify the marker - try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf); - CommitMarkerCodec markerCodec = new CommitMarkerCodec()) { - for (int num : randomInts) { - Assert.assertEquals(num, markerCodec.readMarker(reader)); - } - } - } - - private static class IncompleteValueBytes implements SequenceFile.ValueBytes { - - @Override - public void writeUncompressedBytes(DataOutputStream outStream) throws IOException { - // don't write anything to simulate incomplete record - } - - @Override - public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException { - throw new IllegalArgumentException("Not possible"); - } - - @Override - public int getSize() { - return Ints.BYTES; - } - } - - @Test - public void testIncompleteCommitMarker() throws Exception { - Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE); - try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class, - LongWritable.class, - SequenceFile.CompressionType.NONE)) { - String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED; - SequenceFile.ValueBytes valueBytes = new IncompleteValueBytes(); - writer.appendRaw(key.getBytes(), 0, key.length(), valueBytes); - writer.hflush(); - writer.hsync(); - } - - // Read the incomplete commit marker - try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf); - CommitMarkerCodec markerCodec = new CommitMarkerCodec()) { - try { - markerCodec.readMarker(reader); - Assert.fail("Expected EOF Exception to be thrown"); - } catch (EOFException e) { - // expected since we didn't write the value bytes - } - } - } - - @Test - public void testIncorrectCommitMarker() throws Exception { - Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE); - - // Write an incorrect marker - try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class, - LongWritable.class, - SequenceFile.CompressionType.NONE)) { - String invalidKey = "IncorrectKey"; - SequenceFile.ValueBytes valueBytes = new CommitMarkerCodec.CommitEntriesCount(100); - writer.appendRaw(invalidKey.getBytes(), 0, invalidKey.length(), valueBytes); - writer.hflush(); - writer.hsync(); - } - - // Read the commit markers back to verify the marker - try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf); - CommitMarkerCodec markerCodec = new CommitMarkerCodec()) { - try { - markerCodec.readMarker(reader); - Assert.fail("Expected an IOException to be thrown"); - } catch (IOException e) { - // expected - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionLogTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionLogTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionLogTest.java deleted file mode 100644 index 96015d1..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionLogTest.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.TxConstants; -import co.cask.tephra.metrics.MetricsCollector; -import co.cask.tephra.metrics.TxMetricsCollector; -import co.cask.tephra.util.TransactionEditUtil; -import com.google.common.io.Closeables; -import com.google.common.primitives.Longs; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - - -/** - * Testing for complete and partial sycs of {@link TransactionEdit} to {@link HDFSTransactionLog} - */ -public class HDFSTransactionLogTest { - @ClassRule - public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); - private static final String LOG_FILE_PREFIX = "txlog."; - - private static MiniDFSCluster dfsCluster; - private static Configuration conf; - private static MetricsCollector metricsCollector; - - @BeforeClass - public static void setupBeforeClass() throws Exception { - Configuration hConf = new Configuration(); - hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath()); - - dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build(); - conf = new Configuration(dfsCluster.getFileSystem().getConf()); - metricsCollector = new TxMetricsCollector(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - dfsCluster.shutdown(); - } - - private Configuration getConfiguration() throws IOException { - // tests should use the current user for HDFS - conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); - conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, TMP_FOLDER.newFolder().getAbsolutePath()); - return conf; - } - - private HDFSTransactionLog getHDFSTransactionLog(Configuration conf, - FileSystem fs, long timeInMillis) throws Exception { - String snapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR); - Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis); - return new HDFSTransactionLog(fs, conf, newLog, timeInMillis, metricsCollector); - } - - private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs, - long timeInMillis, boolean withMarker) throws IOException { - String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR); - Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis); - SequenceFile.Metadata metadata = new SequenceFile.Metadata(); - if (withMarker) { - metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY), - new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION))); - } - return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class, - TransactionEdit.class, SequenceFile.CompressionType.NONE, null, null, metadata); - } - - private void writeNumWrites(SequenceFile.Writer writer, final int size) throws Exception { - String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED; - CommitMarkerCodec.writeMarker(writer, size); - } - - private void testTransactionLogSync(int totalCount, int batchSize, boolean withMarker, boolean isComplete) - throws Exception { - List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount); - long timestamp = System.currentTimeMillis(); - Configuration configuration = getConfiguration(); - FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration); - SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, withMarker); - AtomicLong logSequence = new AtomicLong(); - HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp); - AbstractTransactionLog.Entry entry; - - for (int i = 0; i < totalCount - batchSize; i += batchSize) { - if (withMarker) { - writeNumWrites(writer, batchSize); - } - for (int j = 0; j < batchSize; j++) { - entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j)); - writer.append(entry.getKey(), entry.getEdit()); - } - writer.syncFs(); - } - - if (withMarker) { - writeNumWrites(writer, batchSize); - } - - for (int i = totalCount - batchSize; i < totalCount - 1; i++) { - entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(i)); - writer.append(entry.getKey(), entry.getEdit()); - } - - entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), - edits.get(totalCount - 1)); - if (isComplete) { - writer.append(entry.getKey(), entry.getEdit()); - } else { - byte[] bytes = Longs.toByteArray(entry.getKey().get()); - writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes() { - @Override - public void writeUncompressedBytes(DataOutputStream outStream) throws IOException { - byte[] test = new byte[]{0x2}; - outStream.write(test, 0, 1); - } - - @Override - public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException { - // no-op - } - - @Override - public int getSize() { - // mimic size longer than the actual byte array size written, so we would reach EOF - return 12; - } - }); - } - writer.syncFs(); - Closeables.closeQuietly(writer); - - // now let's try to read this log - TransactionLogReader reader = transactionLog.getReader(); - int syncedEdits = 0; - while (reader.next() != null) { - // testing reading the transaction edits - syncedEdits++; - } - if (isComplete) { - Assert.assertEquals(totalCount, syncedEdits); - } else { - Assert.assertEquals(totalCount - batchSize, syncedEdits); - } - } - - @Test - public void testTransactionLogNewVersion() throws Exception { - // in-complete sync - testTransactionLogSync(1000, 1, true, false); - testTransactionLogSync(2000, 5, true, false); - - // complete sync - testTransactionLogSync(1000, 1, true, true); - testTransactionLogSync(2000, 5, true, true); - } - - @Test - public void testTransactionLogOldVersion() throws Exception { - // in-complete sync - testTransactionLogSync(1000, 1, false, false); - - // complete sync - testTransactionLogSync(2000, 5, false, true); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java deleted file mode 100644 index 30ce455..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.TxConstants; -import co.cask.tephra.metrics.TxMetricsCollector; -import co.cask.tephra.snapshot.SnapshotCodecProvider; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; - -import java.io.IOException; - - -/** - * Tests persistence of transaction snapshots and write-ahead logs to HDFS storage, using the - * {@link HDFSTransactionStateStorage} and {@link HDFSTransactionLog} implementations. - */ -public class HDFSTransactionStateStorageTest extends AbstractTransactionStateStorageTest { - - @ClassRule - public static TemporaryFolder tmpFolder = new TemporaryFolder(); - - private static MiniDFSCluster dfsCluster; - private static Configuration conf; - - @BeforeClass - public static void setupBeforeClass() throws Exception { - Configuration hConf = new Configuration(); - hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath()); - - dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build(); - conf = new Configuration(dfsCluster.getFileSystem().getConf()); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - dfsCluster.shutdown(); - } - - @Override - protected Configuration getConfiguration(String testName) throws IOException { - // tests should use the current user for HDFS - conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); - conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); - return conf; - } - - @Override - protected AbstractTransactionStateStorage getStorage(Configuration conf) { - return new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/InMemoryTransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/InMemoryTransactionStateStorage.java b/tephra-core/src/test/java/co/cask/tephra/persist/InMemoryTransactionStateStorage.java deleted file mode 100644 index 1b69c7a..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/persist/InMemoryTransactionStateStorage.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.AbstractIdleService; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import javax.annotation.Nullable; - -/** - * Stores the latest transaction snapshot and logs in memory. - */ -public class InMemoryTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage { - // only keeps the most recent snapshot in memory - private TransactionSnapshot lastSnapshot; - - private NavigableMap<Long, TransactionLog> logs = new TreeMap<>(); - - @Override - protected void startUp() throws Exception { - } - - @Override - protected void shutDown() throws Exception { - lastSnapshot = null; - logs = new TreeMap<>(); - } - - @Override - public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException { - // no codecs in in-memory mode - } - - @Override - public void writeSnapshot(TransactionSnapshot snapshot) throws IOException { - lastSnapshot = snapshot; - } - - @Override - public TransactionSnapshot getLatestSnapshot() throws IOException { - return lastSnapshot; - } - - @Override - public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException { - return lastSnapshot; - } - - @Override - public long deleteOldSnapshots(int numberToKeep) throws IOException { - // always only keep the last snapshot - return lastSnapshot.getTimestamp(); - } - - @Override - public List<String> listSnapshots() throws IOException { - List<String> snapshots = Lists.newArrayListWithCapacity(1); - if (lastSnapshot != null) { - snapshots.add(Long.toString(lastSnapshot.getTimestamp())); - } - return snapshots; - } - - @Override - public List<TransactionLog> getLogsSince(long timestamp) throws IOException { - return Lists.newArrayList(logs.tailMap(timestamp).values()); - } - - @Override - public TransactionLog createLog(long timestamp) throws IOException { - TransactionLog log = new InMemoryTransactionLog(timestamp); - logs.put(timestamp, log); - return log; - } - - @Override - public void deleteLogsOlderThan(long timestamp) throws IOException { - Iterator<Map.Entry<Long, TransactionLog>> logIter = logs.entrySet().iterator(); - while (logIter.hasNext()) { - Map.Entry<Long, TransactionLog> logEntry = logIter.next(); - if (logEntry.getKey() < timestamp) { - logIter.remove(); - } - } - } - - @Override - public void setupStorage() throws IOException { - } - - @Override - public List<String> listLogs() throws IOException { - return Lists.transform(Lists.newArrayList(logs.keySet()), new Function<Long, String>() { - @Nullable - @Override - public String apply(@Nullable Long input) { - return input.toString(); - } - }); - } - - @Override - public String getLocation() { - return "in-memory"; - } - - public static class InMemoryTransactionLog implements TransactionLog { - private long timestamp; - private List<TransactionEdit> edits = Lists.newArrayList(); - boolean isClosed = false; - public InMemoryTransactionLog(long timestamp) { - this.timestamp = timestamp; - } - - @Override - public String getName() { - return "in-memory@" + timestamp; - } - - @Override - public long getTimestamp() { - return timestamp; - } - - @Override - public void append(TransactionEdit edit) throws IOException { - if (isClosed) { - throw new IOException("Log is closed"); - } - edits.add(edit); - } - - @Override - public void append(List<TransactionEdit> edits) throws IOException { - if (isClosed) { - throw new IOException("Log is closed"); - } - edits.addAll(edits); - } - - @Override - public void close() { - isClosed = true; - } - - @Override - public TransactionLogReader getReader() throws IOException { - return new InMemoryLogReader(edits.iterator()); - } - } - - public static class InMemoryLogReader implements TransactionLogReader { - private final Iterator<TransactionEdit> editIterator; - - public InMemoryLogReader(Iterator<TransactionEdit> editIterator) { - this.editIterator = editIterator; - } - - @Override - public TransactionEdit next() throws IOException { - if (editIterator.hasNext()) { - return editIterator.next(); - } - return null; - } - - @Override - public TransactionEdit next(TransactionEdit reuse) throws IOException { - return next(); - } - - @Override - public void close() throws IOException { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java deleted file mode 100644 index e2886ae..0000000 --- a/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.persist; - -import co.cask.tephra.ChangeId; -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TransactionType; -import co.cask.tephra.TxConstants; -import co.cask.tephra.metrics.TxMetricsCollector; -import co.cask.tephra.snapshot.DefaultSnapshotCodec; -import co.cask.tephra.snapshot.SnapshotCodecProvider; -import co.cask.tephra.snapshot.SnapshotCodecV4; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSortedSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.DataOutput; -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -/** - * Runs transaction persistence tests against the {@link LocalFileTransactionStateStorage} and - * {@link LocalFileTransactionLog} implementations. - */ -public class LocalTransactionStateStorageTest extends AbstractTransactionStateStorageTest { - @ClassRule - public static TemporaryFolder tmpDir = new TemporaryFolder(); - - @Override - protected Configuration getConfiguration(String testName) throws IOException { - File testDir = tmpDir.newFolder(testName); - Configuration conf = new Configuration(); - conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath()); - conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName()); - return conf; - } - - @Override - protected AbstractTransactionStateStorage getStorage(Configuration conf) { - return new LocalFileTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); - } - - // v2 TransactionEdit - @SuppressWarnings("deprecation") - private class TransactionEditV2 extends TransactionEdit { - public TransactionEditV2(long writePointer, long visibilityUpperBound, State state, long expirationDate, - Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type) { - super(writePointer, visibilityUpperBound, state, expirationDate, changes, commitPointer, canCommit, type, - null, 0L, 0L, null); - } - @Override - public void write(DataOutput out) throws IOException { - TransactionEditCodecs.encode(this, out, new TransactionEditCodecs.TransactionEditCodecV2()); - } - } - - // Note: this test cannot run in AbstractTransactionStateStorageTest, since SequenceFile throws exception saying - // TransactionEditV2 is not TransactionEdit. Since the code path this test is verifying is the same path between - // HDFS and Local Storage, having this only over here is fine. - @SuppressWarnings("deprecation") - @Test - public void testLongTxnBackwardsCompatibility() throws Exception { - Configuration conf = getConfiguration("testLongTxnBackwardsCompatibility"); - - // Use SnapshotCodec version 1 - String latestSnapshotCodec = conf.get(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES); - conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); - - TransactionStateStorage storage = null; - try { - storage = getStorage(conf); - storage.startAndWait(); - - // Create transaction snapshot and transaction edits with version when long running txns had -1 expiration. - Collection<Long> invalid = Lists.newArrayList(); - NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(); - long time1 = System.currentTimeMillis(); - long wp1 = time1 * TxConstants.MAX_TX_PER_MS; - inProgress.put(wp1, new TransactionManager.InProgressTx(wp1 - 5, -1L)); - long time2 = time1 + 100; - long wp2 = time2 * TxConstants.MAX_TX_PER_MS; - inProgress.put(wp2, new TransactionManager.InProgressTx(wp2 - 50, time2 + 1000)); - Map<Long, Set<ChangeId>> committing = Maps.newHashMap(); - Map<Long, Set<ChangeId>> committed = Maps.newHashMap(); - TransactionSnapshot snapshot = new TransactionSnapshot(time2, 0, wp2, invalid, - inProgress, committing, committed); - long time3 = time1 + 200; - long wp3 = time3 * TxConstants.MAX_TX_PER_MS; - TransactionEdit edit1 = new TransactionEditV2(wp3, wp3 - 10, TransactionEdit.State.INPROGRESS, -1L, - null, 0L, false, null); - long time4 = time1 + 300; - long wp4 = time4 * TxConstants.MAX_TX_PER_MS; - TransactionEdit edit2 = new TransactionEditV2(wp4, wp4 - 10, TransactionEdit.State.INPROGRESS, time4 + 1000, - null, 0L, false, null); - - // write snapshot and transaction edit - storage.writeSnapshot(snapshot); - TransactionLog log = storage.createLog(time2); - log.append(edit1); - log.append(edit2); - log.close(); - - // Start transaction manager - conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, latestSnapshotCodec); - long longTimeout = TimeUnit.SECONDS.toMillis(conf.getLong(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, - TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT)); - TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector()); - txm.startAndWait(); - try { - // Verify that the txns in old format were read correctly. - // There should be four in-progress transactions, and no invalid transactions - TransactionSnapshot snapshot1 = txm.getCurrentState(); - Assert.assertEquals(ImmutableSortedSet.of(wp1, wp2, wp3, wp4), snapshot1.getInProgress().keySet()); - verifyInProgress(snapshot1.getInProgress().get(wp1), TransactionType.LONG, time1 + longTimeout); - verifyInProgress(snapshot1.getInProgress().get(wp2), TransactionType.SHORT, time2 + 1000); - verifyInProgress(snapshot1.getInProgress().get(wp3), TransactionType.LONG, time3 + longTimeout); - verifyInProgress(snapshot1.getInProgress().get(wp4), TransactionType.SHORT, time4 + 1000); - Assert.assertEquals(0, snapshot1.getInvalid().size()); - } finally { - txm.stopAndWait(); - } - } finally { - if (storage != null) { - storage.stopAndWait(); - } - } - } - - // Note: this test cannot run in AbstractTransactionStateStorageTest, since SequenceFile throws exception saying - // TransactionEditV2 is not TransactionEdit. Since the code path this test is verifying is the same path between - // HDFS and Local Storage, having this only over here is fine. - @SuppressWarnings("deprecation") - @Test - public void testAbortEditBackwardsCompatibility() throws Exception { - Configuration conf = getConfiguration("testAbortEditBackwardsCompatibility"); - - TransactionStateStorage storage = null; - try { - storage = getStorage(conf); - storage.startAndWait(); - - // Create edits for transaction type addition to abort - long time1 = System.currentTimeMillis(); - long wp1 = time1 * TxConstants.MAX_TX_PER_MS; - TransactionEdit edit1 = new TransactionEditV2(wp1, wp1 - 10, TransactionEdit.State.INPROGRESS, -1L, - null, 0L, false, null); - TransactionEdit edit2 = new TransactionEditV2(wp1, 0L, TransactionEdit.State.ABORTED, 0L, - null, 0L, false, null); - - long time2 = time1 + 400; - long wp2 = time2 * TxConstants.MAX_TX_PER_MS; - TransactionEdit edit3 = new TransactionEditV2(wp2, wp2 - 10, TransactionEdit.State.INPROGRESS, time2 + 10000, - null, 0L, false, null); - TransactionEdit edit4 = new TransactionEditV2(wp2, 0L, TransactionEdit.State.INVALID, 0L, null, 0L, false, null); - // Simulate case where we cannot determine txn state during abort - TransactionEdit edit5 = new TransactionEditV2(wp2, 0L, TransactionEdit.State.ABORTED, 0L, null, 0L, false, null); - - // write snapshot and transaction edit - TransactionLog log = storage.createLog(time1); - log.append(edit1); - log.append(edit2); - log.append(edit3); - log.append(edit4); - log.append(edit5); - log.close(); - - // Start transaction manager - TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector()); - txm.startAndWait(); - try { - // Verify that the txns in old format were read correctly. - // Both transactions should be in invalid state - TransactionSnapshot snapshot1 = txm.getCurrentState(); - Assert.assertEquals(ImmutableList.of(wp1, wp2), snapshot1.getInvalid()); - Assert.assertEquals(0, snapshot1.getInProgress().size()); - Assert.assertEquals(0, snapshot1.getCommittedChangeSets().size()); - Assert.assertEquals(0, snapshot1.getCommittingChangeSets().size()); - } finally { - txm.stopAndWait(); - } - } finally { - if (storage != null) { - storage.stopAndWait(); - } - } - } - - private void verifyInProgress(TransactionManager.InProgressTx inProgressTx, TransactionType type, - long expiration) throws Exception { - Assert.assertEquals(type, inProgressTx.getType()); - Assert.assertTrue(inProgressTx.getExpiration() == expiration); - } -}
