http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java new file mode 100644 index 0000000..628bced --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.ChangeId; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionType; +import org.apache.tephra.TxConstants; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.util.TransactionEditUtil; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Commons tests to run against the {@link TransactionStateStorage} implementations. + */ +public abstract class AbstractTransactionStateStorageTest { + private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionStateStorageTest.class); + private static Random random = new Random(); + + protected abstract Configuration getConfiguration(String testName) throws IOException; + + protected abstract AbstractTransactionStateStorage getStorage(Configuration conf); + + @Test + public void testSnapshotPersistence() throws Exception { + Configuration conf = getConfiguration("testSnapshotPersistence"); + + TransactionSnapshot snapshot = createRandomSnapshot(); + TransactionStateStorage storage = getStorage(conf); + try { + storage.startAndWait(); + storage.writeSnapshot(snapshot); + + TransactionSnapshot readSnapshot = storage.getLatestSnapshot(); + assertNotNull(readSnapshot); + assertEquals(snapshot, readSnapshot); + } finally { + storage.stopAndWait(); + } + } + + @Test + public void testLogWriteAndRead() throws Exception { + Configuration conf = getConfiguration("testLogWriteAndRead"); + + // create some random entries + List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(100); + TransactionStateStorage storage = getStorage(conf); + try { + long now = System.currentTimeMillis(); + storage.startAndWait(); + TransactionLog log = storage.createLog(now); + for (TransactionEdit edit : edits) { + log.append(edit); + } + log.close(); + + Collection<TransactionLog> logsToRead = storage.getLogsSince(now); + // should only be our one log + assertNotNull(logsToRead); + assertEquals(1, logsToRead.size()); + TransactionLogReader logReader = logsToRead.iterator().next().getReader(); + assertNotNull(logReader); + + List<TransactionEdit> readEdits = Lists.newArrayListWithExpectedSize(edits.size()); + TransactionEdit nextEdit; + while ((nextEdit = logReader.next()) != null) { + readEdits.add(nextEdit); + } + logReader.close(); + assertEquals(edits.size(), readEdits.size()); + for (int i = 0; i < edits.size(); i++) { + LOG.info("Checking edit " + i); + assertEquals(edits.get(i), readEdits.get(i)); + } + } finally { + storage.stopAndWait(); + } + } + + @Test + public void testTransactionManagerPersistence() throws Exception { + Configuration conf = getConfiguration("testTransactionManagerPersistence"); + conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread + // start snapshot thread, but with long enough interval so we only get snapshots on shutdown + conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 600); + + TransactionStateStorage storage = null; + TransactionStateStorage storage2 = null; + TransactionStateStorage storage3 = null; + try { + storage = getStorage(conf); + TransactionManager txManager = new TransactionManager + (conf, storage, new TxMetricsCollector()); + txManager.startAndWait(); + + // TODO: replace with new persistence tests + final byte[] a = { 'a' }; + final byte[] b = { 'b' }; + // start a tx1, add a change A and commit + Transaction tx1 = txManager.startShort(); + Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a))); + Assert.assertTrue(txManager.commit(tx1)); + // start a tx2 and add a change B + Transaction tx2 = txManager.startShort(); + Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b))); + // start a tx3 + Transaction tx3 = txManager.startShort(); + // restart + txManager.stopAndWait(); + TransactionSnapshot origState = txManager.getCurrentState(); + LOG.info("Orig state: " + origState); + + Thread.sleep(100); + // starts a new tx manager + storage2 = getStorage(conf); + txManager = new TransactionManager(conf, storage2, new TxMetricsCollector()); + txManager.startAndWait(); + + // check that the reloaded state matches the old + TransactionSnapshot newState = txManager.getCurrentState(); + LOG.info("New state: " + newState); + assertEquals(origState, newState); + + // commit tx2 + Assert.assertTrue(txManager.commit(tx2)); + // start another transaction, must be greater than tx3 + Transaction tx4 = txManager.startShort(); + Assert.assertTrue(tx4.getTransactionId() > tx3.getTransactionId()); + // tx1 must be visble from tx2, but tx3 and tx4 must not + Assert.assertTrue(tx2.isVisible(tx1.getTransactionId())); + Assert.assertFalse(tx2.isVisible(tx3.getTransactionId())); + Assert.assertFalse(tx2.isVisible(tx4.getTransactionId())); + // add same change for tx3 + Assert.assertFalse(txManager.canCommit(tx3, Collections.singleton(b))); + // check visibility with new xaction + Transaction tx5 = txManager.startShort(); + Assert.assertTrue(tx5.isVisible(tx1.getTransactionId())); + Assert.assertTrue(tx5.isVisible(tx2.getTransactionId())); + Assert.assertFalse(tx5.isVisible(tx3.getTransactionId())); + Assert.assertFalse(tx5.isVisible(tx4.getTransactionId())); + // can commit tx3? + txManager.abort(tx3); + txManager.abort(tx4); + txManager.abort(tx5); + // start new tx and verify its exclude list is empty + Transaction tx6 = txManager.startShort(); + Assert.assertFalse(tx6.hasExcludes()); + txManager.abort(tx6); + + // now start 5 x claim size transactions + Transaction tx = txManager.startShort(); + for (int i = 1; i < 50; i++) { + tx = txManager.startShort(); + } + origState = txManager.getCurrentState(); + + Thread.sleep(100); + // simulate crash by starting a new tx manager without a stopAndWait + storage3 = getStorage(conf); + txManager = new TransactionManager(conf, storage3, new TxMetricsCollector()); + txManager.startAndWait(); + + // verify state again matches (this time should include WAL replay) + newState = txManager.getCurrentState(); + assertEquals(origState, newState); + + // get a new transaction and verify it is greater + Transaction txAfter = txManager.startShort(); + Assert.assertTrue(txAfter.getTransactionId() > tx.getTransactionId()); + } finally { + if (storage != null) { + storage.stopAndWait(); + } + if (storage2 != null) { + storage2.stopAndWait(); + } + if (storage3 != null) { + storage3.stopAndWait(); + } + } + } + + /** + * Tests whether the committed set is advanced properly on WAL replay. + */ + @Test + public void testCommittedSetClearing() throws Exception { + Configuration conf = getConfiguration("testCommittedSetClearing"); + conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no cleanup thread + conf.setInt(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 0); // no periodic snapshots + + TransactionStateStorage storage1 = null; + TransactionStateStorage storage2 = null; + try { + storage1 = getStorage(conf); + TransactionManager txManager = new TransactionManager + (conf, storage1, new TxMetricsCollector()); + txManager.startAndWait(); + + // TODO: replace with new persistence tests + final byte[] a = { 'a' }; + final byte[] b = { 'b' }; + // start a tx1, add a change A and commit + Transaction tx1 = txManager.startShort(); + Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a))); + Assert.assertTrue(txManager.commit(tx1)); + // start a tx2 and add a change B + Transaction tx2 = txManager.startShort(); + Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b))); + // start a tx3 + Transaction tx3 = txManager.startShort(); + TransactionSnapshot origState = txManager.getCurrentState(); + LOG.info("Orig state: " + origState); + + // simulate a failure by starting a new tx manager without stopping first + storage2 = getStorage(conf); + txManager = new TransactionManager(conf, storage2, new TxMetricsCollector()); + txManager.startAndWait(); + + // check that the reloaded state matches the old + TransactionSnapshot newState = txManager.getCurrentState(); + LOG.info("New state: " + newState); + assertEquals(origState, newState); + + } finally { + if (storage1 != null) { + storage1.stopAndWait(); + } + if (storage2 != null) { + storage2.stopAndWait(); + } + } + } + + /** + * Tests removal of old snapshots and old transaction logs. + */ + @Test + public void testOldFileRemoval() throws Exception { + Configuration conf = getConfiguration("testOldFileRemoval"); + TransactionStateStorage storage = null; + try { + storage = getStorage(conf); + storage.startAndWait(); + long now = System.currentTimeMillis(); + long writePointer = 1; + Collection<Long> invalid = Lists.newArrayList(); + NavigableMap<Long, TransactionManager.InProgressTx> inprogress = Maps.newTreeMap(); + Map<Long, Set<ChangeId>> committing = Maps.newHashMap(); + Map<Long, Set<ChangeId>> committed = Maps.newHashMap(); + TransactionSnapshot snapshot = new TransactionSnapshot(now, 0, writePointer++, invalid, + inprogress, committing, committed); + TransactionEdit dummyEdit = TransactionEdit.createStarted(1, 0, Long.MAX_VALUE, TransactionType.SHORT); + + // write snapshot 1 + storage.writeSnapshot(snapshot); + TransactionLog log = storage.createLog(now); + log.append(dummyEdit); + log.close(); + + snapshot = new TransactionSnapshot(now + 1, 0, writePointer++, invalid, inprogress, committing, committed); + // write snapshot 2 + storage.writeSnapshot(snapshot); + log = storage.createLog(now + 1); + log.append(dummyEdit); + log.close(); + + snapshot = new TransactionSnapshot(now + 2, 0, writePointer++, invalid, inprogress, committing, committed); + // write snapshot 3 + storage.writeSnapshot(snapshot); + log = storage.createLog(now + 2); + log.append(dummyEdit); + log.close(); + + snapshot = new TransactionSnapshot(now + 3, 0, writePointer++, invalid, inprogress, committing, committed); + // write snapshot 4 + storage.writeSnapshot(snapshot); + log = storage.createLog(now + 3); + log.append(dummyEdit); + log.close(); + + snapshot = new TransactionSnapshot(now + 4, 0, writePointer++, invalid, inprogress, committing, committed); + // write snapshot 5 + storage.writeSnapshot(snapshot); + log = storage.createLog(now + 4); + log.append(dummyEdit); + log.close(); + + snapshot = new TransactionSnapshot(now + 5, 0, writePointer++, invalid, inprogress, committing, committed); + // write snapshot 6 + storage.writeSnapshot(snapshot); + log = storage.createLog(now + 5); + log.append(dummyEdit); + log.close(); + + List<String> allSnapshots = storage.listSnapshots(); + LOG.info("All snapshots: " + allSnapshots); + assertEquals(6, allSnapshots.size()); + List<String> allLogs = storage.listLogs(); + LOG.info("All logs: " + allLogs); + assertEquals(6, allLogs.size()); + + long oldestKept = storage.deleteOldSnapshots(3); + assertEquals(now + 3, oldestKept); + allSnapshots = storage.listSnapshots(); + LOG.info("All snapshots: " + allSnapshots); + assertEquals(3, allSnapshots.size()); + + storage.deleteLogsOlderThan(oldestKept); + allLogs = storage.listLogs(); + LOG.info("All logs: " + allLogs); + assertEquals(3, allLogs.size()); + } finally { + if (storage != null) { + storage.stopAndWait(); + } + } + } + + @Test + public void testLongTxnEditReplay() throws Exception { + Configuration conf = getConfiguration("testLongTxnEditReplay"); + TransactionStateStorage storage = null; + try { + storage = getStorage(conf); + storage.startAndWait(); + + // Create long running txns. Abort one of them, invalidate another, invalidate and abort the last. + long time1 = System.currentTimeMillis(); + long wp1 = time1 * TxConstants.MAX_TX_PER_MS; + TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG); + TransactionEdit edit2 = TransactionEdit.createAborted(wp1, TransactionType.LONG, null); + + long time2 = time1 + 100; + long wp2 = time2 * TxConstants.MAX_TX_PER_MS; + TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 100000, TransactionType.LONG); + TransactionEdit edit4 = TransactionEdit.createInvalid(wp2); + + long time3 = time1 + 200; + long wp3 = time3 * TxConstants.MAX_TX_PER_MS; + TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG); + TransactionEdit edit6 = TransactionEdit.createInvalid(wp3); + TransactionEdit edit7 = TransactionEdit.createAborted(wp3, TransactionType.LONG, null); + + // write transaction edits + TransactionLog log = storage.createLog(time1); + log.append(edit1); + log.append(edit2); + log.append(edit3); + log.append(edit4); + log.append(edit5); + log.append(edit6); + log.append(edit7); + log.close(); + + // Start transaction manager + TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector()); + txm.startAndWait(); + try { + // Verify that all txns are in invalid list. + TransactionSnapshot snapshot1 = txm.getCurrentState(); + assertEquals(ImmutableList.of(wp1, wp2, wp3), snapshot1.getInvalid()); + assertEquals(0, snapshot1.getInProgress().size()); + assertEquals(0, snapshot1.getCommittedChangeSets().size()); + assertEquals(0, snapshot1.getCommittedChangeSets().size()); + } finally { + txm.stopAndWait(); + } + } finally { + if (storage != null) { + storage.stopAndWait(); + } + } + } + + @Test + public void testTruncateInvalidTxEditReplay() throws Exception { + Configuration conf = getConfiguration("testTruncateInvalidTxEditReplay"); + TransactionStateStorage storage = null; + try { + storage = getStorage(conf); + storage.startAndWait(); + + // Create some txns, and invalidate all of them. + long time1 = System.currentTimeMillis(); + long wp1 = time1 * TxConstants.MAX_TX_PER_MS; + TransactionEdit edit1 = TransactionEdit.createStarted(wp1, wp1 - 10, time1 + 100000, TransactionType.LONG); + TransactionEdit edit2 = TransactionEdit.createInvalid(wp1); + + long time2 = time1 + 100; + long wp2 = time2 * TxConstants.MAX_TX_PER_MS; + TransactionEdit edit3 = TransactionEdit.createStarted(wp2, wp2 - 10, time2 + 10000, TransactionType.SHORT); + TransactionEdit edit4 = TransactionEdit.createInvalid(wp2); + + long time3 = time1 + 2000; + long wp3 = time3 * TxConstants.MAX_TX_PER_MS; + TransactionEdit edit5 = TransactionEdit.createStarted(wp3, wp3 - 10, time3 + 100000, TransactionType.LONG); + TransactionEdit edit6 = TransactionEdit.createInvalid(wp3); + + long time4 = time1 + 2100; + long wp4 = time4 * TxConstants.MAX_TX_PER_MS; + TransactionEdit edit7 = TransactionEdit.createStarted(wp4, wp4 - 10, time4 + 10000, TransactionType.SHORT); + TransactionEdit edit8 = TransactionEdit.createInvalid(wp4); + + // remove wp1 and wp3 from invalid list + TransactionEdit edit9 = TransactionEdit.createTruncateInvalidTx(ImmutableSet.of(wp1, wp3)); + // truncate invalid transactions before time3 + TransactionEdit edit10 = TransactionEdit.createTruncateInvalidTxBefore(time3); + + // write transaction edits + TransactionLog log = storage.createLog(time1); + log.append(edit1); + log.append(edit2); + log.append(edit3); + log.append(edit4); + log.append(edit5); + log.append(edit6); + log.append(edit7); + log.append(edit8); + log.append(edit9); + log.append(edit10); + log.close(); + + // Start transaction manager + TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector()); + txm.startAndWait(); + try { + // Only wp4 should be in invalid list. + TransactionSnapshot snapshot = txm.getCurrentState(); + assertEquals(ImmutableList.of(wp4), snapshot.getInvalid()); + assertEquals(0, snapshot.getInProgress().size()); + assertEquals(0, snapshot.getCommittedChangeSets().size()); + assertEquals(0, snapshot.getCommittedChangeSets().size()); + } finally { + txm.stopAndWait(); + } + } finally { + if (storage != null) { + storage.stopAndWait(); + } + } + } + + /** + * Generates a new snapshot object with semi-randomly populated values. This does not necessarily accurately + * represent a typical snapshot's distribution of values, as we only set an upper bound on pointer values. + * + * We generate a new snapshot with the contents: + * <ul> + * <li>readPointer = 1M + (random % 1M)</li> + * <li>writePointer = readPointer + 1000</li> + * <li>waterMark = writePointer + 1000</li> + * <li>inProgress = one each for (writePointer - 500)..writePointer, ~ 5% "long" transaction</li> + * <li>invalid = 100 randomly distributed, 0..1M</li> + * <li>committing = one each, (readPointer + 1)..(readPointer + 100)</li> + * <li>committed = one each, (readPointer - 1000)..readPointer</li> + * </ul> + * @return a new snapshot of transaction state. + */ + private TransactionSnapshot createRandomSnapshot() { + // limit readPointer to a reasonable range, but make it > 1M so we can assign enough keys below + long readPointer = (Math.abs(random.nextLong()) % 1000000L) + 1000000L; + long writePointer = readPointer + 1000L; + + // generate in progress -- assume last 500 write pointer values + NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(); + long startPointer = writePointer - 500L; + for (int i = 0; i < 500; i++) { + long currentTime = System.currentTimeMillis(); + // make some "long" transactions + if (i % 20 == 0) { + inProgress.put(startPointer + i, + new TransactionManager.InProgressTx(startPointer - 1, currentTime + TimeUnit.DAYS.toSeconds(1), + TransactionType.LONG)); + } else { + inProgress.put(startPointer + i, + new TransactionManager.InProgressTx(startPointer - 1, currentTime + 300000L, + TransactionType.SHORT)); + } + } + + // make 100 random invalid IDs + LongArrayList invalid = new LongArrayList(); + for (int i = 0; i < 100; i++) { + invalid.add(Math.abs(random.nextLong()) % 1000000L); + } + + // make 100 committing entries, 10 keys each + Map<Long, Set<ChangeId>> committing = Maps.newHashMap(); + for (int i = 0; i < 100; i++) { + committing.put(readPointer + i, generateChangeSet(10)); + } + + // make 1000 committed entries, 10 keys each + long startCommitted = readPointer - 1000L; + NavigableMap<Long, Set<ChangeId>> committed = Maps.newTreeMap(); + for (int i = 0; i < 1000; i++) { + committed.put(startCommitted + i, generateChangeSet(10)); + } + + return new TransactionSnapshot(System.currentTimeMillis(), readPointer, writePointer, + invalid, inProgress, committing, committed); + } + + private Set<ChangeId> generateChangeSet(int numEntries) { + Set<ChangeId> changes = Sets.newHashSet(); + for (int i = 0; i < numEntries; i++) { + byte[] bytes = new byte[8]; + random.nextBytes(bytes); + changes.add(new ChangeId(bytes)); + } + return changes; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/CommitMarkerCodecTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/CommitMarkerCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/CommitMarkerCodecTest.java new file mode 100644 index 0000000..22cdb59 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/persist/CommitMarkerCodecTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.primitives.Ints; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.tephra.TxConstants; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * Unit Test for {@link CommitMarkerCodec}. + */ +public class CommitMarkerCodecTest { + + @ClassRule + public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + private static final String LOG_FILE = "txlog"; + private static final Random RANDOM = new Random(); + + private static MiniDFSCluster dfsCluster; + private static Configuration conf; + private static FileSystem fs; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration hConf = new Configuration(); + hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath()); + + dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build(); + conf = new Configuration(dfsCluster.getFileSystem().getConf()); + fs = FileSystem.newInstance(FileSystem.getDefaultUri(conf), conf); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + dfsCluster.shutdown(); + } + + @Test + public void testRandomCommitMarkers() throws Exception { + List<Integer> randomInts = new ArrayList<>(); + Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE); + + // Write a bunch of random commit markers + try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class, + LongWritable.class, + SequenceFile.CompressionType.NONE)) { + for (int i = 0; i < 1000; i++) { + int randomNum = RANDOM.nextInt(Integer.MAX_VALUE); + CommitMarkerCodec.writeMarker(writer, randomNum); + randomInts.add(randomNum); + } + writer.hflush(); + writer.hsync(); + } + + // Read the commit markers back to verify the marker + try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf); + CommitMarkerCodec markerCodec = new CommitMarkerCodec()) { + for (int num : randomInts) { + Assert.assertEquals(num, markerCodec.readMarker(reader)); + } + } + } + + private static class IncompleteValueBytes implements SequenceFile.ValueBytes { + + @Override + public void writeUncompressedBytes(DataOutputStream outStream) throws IOException { + // don't write anything to simulate incomplete record + } + + @Override + public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException { + throw new IllegalArgumentException("Not possible"); + } + + @Override + public int getSize() { + return Ints.BYTES; + } + } + + @Test + public void testIncompleteCommitMarker() throws Exception { + Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE); + try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class, + LongWritable.class, + SequenceFile.CompressionType.NONE)) { + String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED; + SequenceFile.ValueBytes valueBytes = new IncompleteValueBytes(); + writer.appendRaw(key.getBytes(), 0, key.length(), valueBytes); + writer.hflush(); + writer.hsync(); + } + + // Read the incomplete commit marker + try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf); + CommitMarkerCodec markerCodec = new CommitMarkerCodec()) { + try { + markerCodec.readMarker(reader); + Assert.fail("Expected EOF Exception to be thrown"); + } catch (EOFException e) { + // expected since we didn't write the value bytes + } + } + } + + @Test + public void testIncorrectCommitMarker() throws Exception { + Path newLog = new Path(TMP_FOLDER.newFolder().getAbsolutePath(), LOG_FILE); + + // Write an incorrect marker + try (SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, newLog, LongWritable.class, + LongWritable.class, + SequenceFile.CompressionType.NONE)) { + String invalidKey = "IncorrectKey"; + SequenceFile.ValueBytes valueBytes = new CommitMarkerCodec.CommitEntriesCount(100); + writer.appendRaw(invalidKey.getBytes(), 0, invalidKey.length(), valueBytes); + writer.hflush(); + writer.hsync(); + } + + // Read the commit markers back to verify the marker + try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, newLog, conf); + CommitMarkerCodec markerCodec = new CommitMarkerCodec()) { + try { + markerCodec.readMarker(reader); + Assert.fail("Expected an IOException to be thrown"); + } catch (IOException e) { + // expected + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java new file mode 100644 index 0000000..7b9f06b --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.io.Closeables; +import com.google.common.primitives.Longs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.tephra.TxConstants; +import org.apache.tephra.metrics.MetricsCollector; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.util.TransactionEditUtil; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * Testing for complete and partial sycs of {@link TransactionEdit} to {@link HDFSTransactionLog} + */ +public class HDFSTransactionLogTest { + @ClassRule + public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + private static final String LOG_FILE_PREFIX = "txlog."; + + private static MiniDFSCluster dfsCluster; + private static Configuration conf; + private static MetricsCollector metricsCollector; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration hConf = new Configuration(); + hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath()); + + dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build(); + conf = new Configuration(dfsCluster.getFileSystem().getConf()); + metricsCollector = new TxMetricsCollector(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + dfsCluster.shutdown(); + } + + private Configuration getConfiguration() throws IOException { + // tests should use the current user for HDFS + conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, TMP_FOLDER.newFolder().getAbsolutePath()); + return conf; + } + + private HDFSTransactionLog getHDFSTransactionLog(Configuration conf, + FileSystem fs, long timeInMillis) throws Exception { + String snapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR); + Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis); + return new HDFSTransactionLog(fs, conf, newLog, timeInMillis, metricsCollector); + } + + private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs, + long timeInMillis, boolean withMarker) throws IOException { + String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR); + Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis); + SequenceFile.Metadata metadata = new SequenceFile.Metadata(); + if (withMarker) { + metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY), + new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION))); + } + return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class, + TransactionEdit.class, SequenceFile.CompressionType.NONE, null, null, metadata); + } + + private void writeNumWrites(SequenceFile.Writer writer, final int size) throws Exception { + String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED; + CommitMarkerCodec.writeMarker(writer, size); + } + + private void testTransactionLogSync(int totalCount, int batchSize, boolean withMarker, boolean isComplete) + throws Exception { + List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount); + long timestamp = System.currentTimeMillis(); + Configuration configuration = getConfiguration(); + FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration); + SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, withMarker); + AtomicLong logSequence = new AtomicLong(); + HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp); + AbstractTransactionLog.Entry entry; + + for (int i = 0; i < totalCount - batchSize; i += batchSize) { + if (withMarker) { + writeNumWrites(writer, batchSize); + } + for (int j = 0; j < batchSize; j++) { + entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j)); + writer.append(entry.getKey(), entry.getEdit()); + } + writer.syncFs(); + } + + if (withMarker) { + writeNumWrites(writer, batchSize); + } + + for (int i = totalCount - batchSize; i < totalCount - 1; i++) { + entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(i)); + writer.append(entry.getKey(), entry.getEdit()); + } + + entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), + edits.get(totalCount - 1)); + if (isComplete) { + writer.append(entry.getKey(), entry.getEdit()); + } else { + byte[] bytes = Longs.toByteArray(entry.getKey().get()); + writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes() { + @Override + public void writeUncompressedBytes(DataOutputStream outStream) throws IOException { + byte[] test = new byte[]{0x2}; + outStream.write(test, 0, 1); + } + + @Override + public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException { + // no-op + } + + @Override + public int getSize() { + // mimic size longer than the actual byte array size written, so we would reach EOF + return 12; + } + }); + } + writer.syncFs(); + Closeables.closeQuietly(writer); + + // now let's try to read this log + TransactionLogReader reader = transactionLog.getReader(); + int syncedEdits = 0; + while (reader.next() != null) { + // testing reading the transaction edits + syncedEdits++; + } + if (isComplete) { + Assert.assertEquals(totalCount, syncedEdits); + } else { + Assert.assertEquals(totalCount - batchSize, syncedEdits); + } + } + + @Test + public void testTransactionLogNewVersion() throws Exception { + // in-complete sync + testTransactionLogSync(1000, 1, true, false); + testTransactionLogSync(2000, 5, true, false); + + // complete sync + testTransactionLogSync(1000, 1, true, true); + testTransactionLogSync(2000, 5, true, true); + } + + @Test + public void testTransactionLogOldVersion() throws Exception { + // in-complete sync + testTransactionLogSync(1000, 1, false, false); + + // complete sync + testTransactionLogSync(2000, 5, false, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionStateStorageTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionStateStorageTest.java new file mode 100644 index 0000000..f0eb9e5 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionStateStorageTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.tephra.TxConstants; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; + + +/** + * Tests persistence of transaction snapshots and write-ahead logs to HDFS storage, using the + * {@link HDFSTransactionStateStorage} and {@link HDFSTransactionLog} implementations. + */ +public class HDFSTransactionStateStorageTest extends AbstractTransactionStateStorageTest { + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static MiniDFSCluster dfsCluster; + private static Configuration conf; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration hConf = new Configuration(); + hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath()); + + dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build(); + conf = new Configuration(dfsCluster.getFileSystem().getConf()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + dfsCluster.shutdown(); + } + + @Override + protected Configuration getConfiguration(String testName) throws IOException { + // tests should use the current user for HDFS + conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); + return conf; + } + + @Override + protected AbstractTransactionStateStorage getStorage(Configuration conf) { + return new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/InMemoryTransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/InMemoryTransactionStateStorage.java b/tephra-core/src/test/java/org/apache/tephra/persist/InMemoryTransactionStateStorage.java new file mode 100644 index 0000000..366542b --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/persist/InMemoryTransactionStateStorage.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AbstractIdleService; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import javax.annotation.Nullable; + +/** + * Stores the latest transaction snapshot and logs in memory. + */ +public class InMemoryTransactionStateStorage extends AbstractIdleService implements TransactionStateStorage { + // only keeps the most recent snapshot in memory + private TransactionSnapshot lastSnapshot; + + private NavigableMap<Long, TransactionLog> logs = new TreeMap<>(); + + @Override + protected void startUp() throws Exception { + } + + @Override + protected void shutDown() throws Exception { + lastSnapshot = null; + logs = new TreeMap<>(); + } + + @Override + public void writeSnapshot(OutputStream out, TransactionSnapshot snapshot) throws IOException { + // no codecs in in-memory mode + } + + @Override + public void writeSnapshot(TransactionSnapshot snapshot) throws IOException { + lastSnapshot = snapshot; + } + + @Override + public TransactionSnapshot getLatestSnapshot() throws IOException { + return lastSnapshot; + } + + @Override + public TransactionVisibilityState getLatestTransactionVisibilityState() throws IOException { + return lastSnapshot; + } + + @Override + public long deleteOldSnapshots(int numberToKeep) throws IOException { + // always only keep the last snapshot + return lastSnapshot.getTimestamp(); + } + + @Override + public List<String> listSnapshots() throws IOException { + List<String> snapshots = Lists.newArrayListWithCapacity(1); + if (lastSnapshot != null) { + snapshots.add(Long.toString(lastSnapshot.getTimestamp())); + } + return snapshots; + } + + @Override + public List<TransactionLog> getLogsSince(long timestamp) throws IOException { + return Lists.newArrayList(logs.tailMap(timestamp).values()); + } + + @Override + public TransactionLog createLog(long timestamp) throws IOException { + TransactionLog log = new InMemoryTransactionLog(timestamp); + logs.put(timestamp, log); + return log; + } + + @Override + public void deleteLogsOlderThan(long timestamp) throws IOException { + Iterator<Map.Entry<Long, TransactionLog>> logIter = logs.entrySet().iterator(); + while (logIter.hasNext()) { + Map.Entry<Long, TransactionLog> logEntry = logIter.next(); + if (logEntry.getKey() < timestamp) { + logIter.remove(); + } + } + } + + @Override + public void setupStorage() throws IOException { + } + + @Override + public List<String> listLogs() throws IOException { + return Lists.transform(Lists.newArrayList(logs.keySet()), new Function<Long, String>() { + @Nullable + @Override + public String apply(@Nullable Long input) { + return input.toString(); + } + }); + } + + @Override + public String getLocation() { + return "in-memory"; + } + + public static class InMemoryTransactionLog implements TransactionLog { + private long timestamp; + private List<TransactionEdit> edits = Lists.newArrayList(); + boolean isClosed = false; + public InMemoryTransactionLog(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public String getName() { + return "in-memory@" + timestamp; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public void append(TransactionEdit edit) throws IOException { + if (isClosed) { + throw new IOException("Log is closed"); + } + edits.add(edit); + } + + @Override + public void append(List<TransactionEdit> edits) throws IOException { + if (isClosed) { + throw new IOException("Log is closed"); + } + edits.addAll(edits); + } + + @Override + public void close() { + isClosed = true; + } + + @Override + public TransactionLogReader getReader() throws IOException { + return new InMemoryLogReader(edits.iterator()); + } + } + + public static class InMemoryLogReader implements TransactionLogReader { + private final Iterator<TransactionEdit> editIterator; + + public InMemoryLogReader(Iterator<TransactionEdit> editIterator) { + this.editIterator = editIterator; + } + + @Override + public TransactionEdit next() throws IOException { + if (editIterator.hasNext()) { + return editIterator.next(); + } + return null; + } + + @Override + public TransactionEdit next(TransactionEdit reuse) throws IOException { + return next(); + } + + @Override + public void close() throws IOException { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java new file mode 100644 index 0000000..9535102 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/persist/LocalTransactionStateStorageTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.ChangeId; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionType; +import org.apache.tephra.TxConstants; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.snapshot.DefaultSnapshotCodec; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.apache.tephra.snapshot.SnapshotCodecV4; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Runs transaction persistence tests against the {@link LocalFileTransactionStateStorage} and + * {@link LocalFileTransactionLog} implementations. + */ +public class LocalTransactionStateStorageTest extends AbstractTransactionStateStorageTest { + @ClassRule + public static TemporaryFolder tmpDir = new TemporaryFolder(); + + @Override + protected Configuration getConfiguration(String testName) throws IOException { + File testDir = tmpDir.newFolder(testName); + Configuration conf = new Configuration(); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath()); + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName()); + return conf; + } + + @Override + protected AbstractTransactionStateStorage getStorage(Configuration conf) { + return new LocalFileTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); + } + + // v2 TransactionEdit + @SuppressWarnings("deprecation") + private class TransactionEditV2 extends TransactionEdit { + public TransactionEditV2(long writePointer, long visibilityUpperBound, State state, long expirationDate, + Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type) { + super(writePointer, visibilityUpperBound, state, expirationDate, changes, commitPointer, canCommit, type, + null, 0L, 0L, null); + } + @Override + public void write(DataOutput out) throws IOException { + TransactionEditCodecs.encode(this, out, new TransactionEditCodecs.TransactionEditCodecV2()); + } + } + + // Note: this test cannot run in AbstractTransactionStateStorageTest, since SequenceFile throws exception saying + // TransactionEditV2 is not TransactionEdit. Since the code path this test is verifying is the same path between + // HDFS and Local Storage, having this only over here is fine. + @SuppressWarnings("deprecation") + @Test + public void testLongTxnBackwardsCompatibility() throws Exception { + Configuration conf = getConfiguration("testLongTxnBackwardsCompatibility"); + + // Use SnapshotCodec version 1 + String latestSnapshotCodec = conf.get(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES); + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); + + TransactionStateStorage storage = null; + try { + storage = getStorage(conf); + storage.startAndWait(); + + // Create transaction snapshot and transaction edits with version when long running txns had -1 expiration. + Collection<Long> invalid = Lists.newArrayList(); + NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(); + long time1 = System.currentTimeMillis(); + long wp1 = time1 * TxConstants.MAX_TX_PER_MS; + inProgress.put(wp1, new TransactionManager.InProgressTx(wp1 - 5, -1L)); + long time2 = time1 + 100; + long wp2 = time2 * TxConstants.MAX_TX_PER_MS; + inProgress.put(wp2, new TransactionManager.InProgressTx(wp2 - 50, time2 + 1000)); + Map<Long, Set<ChangeId>> committing = Maps.newHashMap(); + Map<Long, Set<ChangeId>> committed = Maps.newHashMap(); + TransactionSnapshot snapshot = new TransactionSnapshot(time2, 0, wp2, invalid, + inProgress, committing, committed); + long time3 = time1 + 200; + long wp3 = time3 * TxConstants.MAX_TX_PER_MS; + TransactionEdit edit1 = new TransactionEditV2(wp3, wp3 - 10, TransactionEdit.State.INPROGRESS, -1L, + null, 0L, false, null); + long time4 = time1 + 300; + long wp4 = time4 * TxConstants.MAX_TX_PER_MS; + TransactionEdit edit2 = new TransactionEditV2(wp4, wp4 - 10, TransactionEdit.State.INPROGRESS, time4 + 1000, + null, 0L, false, null); + + // write snapshot and transaction edit + storage.writeSnapshot(snapshot); + TransactionLog log = storage.createLog(time2); + log.append(edit1); + log.append(edit2); + log.close(); + + // Start transaction manager + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, latestSnapshotCodec); + long longTimeout = TimeUnit.SECONDS.toMillis(conf.getLong(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, + TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT)); + TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector()); + txm.startAndWait(); + try { + // Verify that the txns in old format were read correctly. + // There should be four in-progress transactions, and no invalid transactions + TransactionSnapshot snapshot1 = txm.getCurrentState(); + Assert.assertEquals(ImmutableSortedSet.of(wp1, wp2, wp3, wp4), snapshot1.getInProgress().keySet()); + verifyInProgress(snapshot1.getInProgress().get(wp1), TransactionType.LONG, time1 + longTimeout); + verifyInProgress(snapshot1.getInProgress().get(wp2), TransactionType.SHORT, time2 + 1000); + verifyInProgress(snapshot1.getInProgress().get(wp3), TransactionType.LONG, time3 + longTimeout); + verifyInProgress(snapshot1.getInProgress().get(wp4), TransactionType.SHORT, time4 + 1000); + Assert.assertEquals(0, snapshot1.getInvalid().size()); + } finally { + txm.stopAndWait(); + } + } finally { + if (storage != null) { + storage.stopAndWait(); + } + } + } + + // Note: this test cannot run in AbstractTransactionStateStorageTest, since SequenceFile throws exception saying + // TransactionEditV2 is not TransactionEdit. Since the code path this test is verifying is the same path between + // HDFS and Local Storage, having this only over here is fine. + @SuppressWarnings("deprecation") + @Test + public void testAbortEditBackwardsCompatibility() throws Exception { + Configuration conf = getConfiguration("testAbortEditBackwardsCompatibility"); + + TransactionStateStorage storage = null; + try { + storage = getStorage(conf); + storage.startAndWait(); + + // Create edits for transaction type addition to abort + long time1 = System.currentTimeMillis(); + long wp1 = time1 * TxConstants.MAX_TX_PER_MS; + TransactionEdit edit1 = new TransactionEditV2(wp1, wp1 - 10, TransactionEdit.State.INPROGRESS, -1L, + null, 0L, false, null); + TransactionEdit edit2 = new TransactionEditV2(wp1, 0L, TransactionEdit.State.ABORTED, 0L, + null, 0L, false, null); + + long time2 = time1 + 400; + long wp2 = time2 * TxConstants.MAX_TX_PER_MS; + TransactionEdit edit3 = new TransactionEditV2(wp2, wp2 - 10, TransactionEdit.State.INPROGRESS, time2 + 10000, + null, 0L, false, null); + TransactionEdit edit4 = new TransactionEditV2(wp2, 0L, TransactionEdit.State.INVALID, 0L, null, 0L, false, null); + // Simulate case where we cannot determine txn state during abort + TransactionEdit edit5 = new TransactionEditV2(wp2, 0L, TransactionEdit.State.ABORTED, 0L, null, 0L, false, null); + + // write snapshot and transaction edit + TransactionLog log = storage.createLog(time1); + log.append(edit1); + log.append(edit2); + log.append(edit3); + log.append(edit4); + log.append(edit5); + log.close(); + + // Start transaction manager + TransactionManager txm = new TransactionManager(conf, storage, new TxMetricsCollector()); + txm.startAndWait(); + try { + // Verify that the txns in old format were read correctly. + // Both transactions should be in invalid state + TransactionSnapshot snapshot1 = txm.getCurrentState(); + Assert.assertEquals(ImmutableList.of(wp1, wp2), snapshot1.getInvalid()); + Assert.assertEquals(0, snapshot1.getInProgress().size()); + Assert.assertEquals(0, snapshot1.getCommittedChangeSets().size()); + Assert.assertEquals(0, snapshot1.getCommittingChangeSets().size()); + } finally { + txm.stopAndWait(); + } + } finally { + if (storage != null) { + storage.stopAndWait(); + } + } + } + + private void verifyInProgress(TransactionManager.InProgressTx inProgressTx, TransactionType type, + long expiration) throws Exception { + Assert.assertEquals(type, inProgressTx.getType()); + Assert.assertTrue(inProgressTx.getExpiration() == expiration); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/persist/TransactionEditTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/TransactionEditTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/TransactionEditTest.java new file mode 100644 index 0000000..46cb81c --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/persist/TransactionEditTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.persist; + +import com.google.common.collect.Sets; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import org.apache.tephra.ChangeId; +import org.apache.tephra.TransactionType; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInput; +import java.io.IOException; + +/** + * test for {@link TransactionEdit} + */ +public class TransactionEditTest { + private static final byte[] COL = new byte[] {'c'}; + + @Test + public void testV1SerdeCompat() throws Exception { + TransactionEditCodecs.TransactionEditCodec olderCodec = new TransactionEditCodecs.TransactionEditCodecV1(); + // start tx edit and committed tx edit cover all fields of tx edit + // NOTE: set visibilityUpperBound to 0 and transaction type to null as this is expected default + // for decoding older versions that doesn't store it + verifyDecodingSupportsOlderVersion(TransactionEdit.createStarted(2L, 0L, 1000L, null), olderCodec); + verifyDecodingSupportsOlderVersion( + TransactionEdit.createCommitted(2L, Sets.newHashSet(new ChangeId(COL)), 3L, true), olderCodec); + } + + @Test + public void testV2SerdeCompat() throws Exception { + TransactionEditCodecs.TransactionEditCodec olderCodec = new TransactionEditCodecs.TransactionEditCodecV2(); + // start tx edit and committed tx edit cover all fields of tx edit + // NOTE: transaction type to null as this is expected default for decoding older versions that doesn't store it + verifyDecodingSupportsOlderVersion(TransactionEdit.createStarted(2L, 100L, 1000L, null), olderCodec); + verifyDecodingSupportsOlderVersion( + TransactionEdit.createCommitted(2L, Sets.newHashSet(new ChangeId(COL)), 3L, true), olderCodec); + } + + @SuppressWarnings("deprecation") + private void verifyDecodingSupportsOlderVersion(TransactionEdit edit, + TransactionEditCodecs.TransactionEditCodec olderCodec) + throws IOException { + // encoding with older version of codec + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + TransactionEditCodecs.encode(edit, out, olderCodec); + + // decoding + TransactionEdit decodedEdit = new TransactionEdit(); + DataInput in = ByteStreams.newDataInput(out.toByteArray()); + decodedEdit.readFields(in); + + Assert.assertEquals(edit, decodedEdit); + } + + @Test + public void testSerialization() throws Exception { + assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[0])); + assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[]{ 2L, 3L })); + assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.LONG, new long[0])); + assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.LONG, new long[]{ 2L, 3L })); + + assertSerializedEdit(TransactionEdit.createCheckpoint(2L, 1L)); + + assertSerializedEdit(TransactionEdit.createCommitted(1L, Sets.<ChangeId>newHashSet(), 2L, false)); + assertSerializedEdit(TransactionEdit.createCommitted(1L, Sets.<ChangeId>newHashSet(), 2L, true)); + assertSerializedEdit(TransactionEdit.createCommitted(1L, + Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'})), 2L, false)); + assertSerializedEdit(TransactionEdit.createCommitted(1L, + Sets.newHashSet(new ChangeId(new byte[]{ 'a', 'b', 'c' }), new ChangeId(new byte[]{ 'd', 'e', 'f' })), + 2L, true)); + + assertSerializedEdit(TransactionEdit.createCommitting(1L, Sets.<ChangeId>newHashSet())); + assertSerializedEdit(TransactionEdit.createCommitting(1L, + Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'})))); + assertSerializedEdit(TransactionEdit.createCommitting(1L, + Sets.newHashSet(new ChangeId(new byte[]{'a', 'b', 'c'}), new ChangeId(new byte[]{'d', 'e', 'f'})))); + + assertSerializedEdit(TransactionEdit.createInvalid(1L)); + + assertSerializedEdit(TransactionEdit.createMoveWatermark(10L)); + + assertSerializedEdit(TransactionEdit.createStarted(2L, 1L, System.currentTimeMillis() + 1000, + TransactionType.SHORT)); + assertSerializedEdit(TransactionEdit.createStarted(2L, 1L, System.currentTimeMillis() + 10000, + TransactionType.LONG)); + + assertSerializedEdit(TransactionEdit.createTruncateInvalidTx(Sets.newHashSet(new Long(1)))); + assertSerializedEdit(TransactionEdit.createTruncateInvalidTx( + Sets.newHashSet(new Long(1), new Long(2), new Long(3)))); + + assertSerializedEdit(TransactionEdit.createTruncateInvalidTxBefore(System.currentTimeMillis())); + } + + private void assertSerializedEdit(TransactionEdit originalEdit) throws IOException { + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + originalEdit.write(out); + + TransactionEdit decodedEdit = new TransactionEdit(); + DataInput in = ByteStreams.newDataInput(out.toByteArray()); + decodedEdit.readFields(in); + + Assert.assertEquals(originalEdit, decodedEdit); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java new file mode 100644 index 0000000..afdff5c --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/snapshot/SnapshotCodecTest.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.snapshot; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.ChangeId; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionNotInProgressException; +import org.apache.tephra.TransactionType; +import org.apache.tephra.TxConstants; +import org.apache.tephra.persist.TransactionSnapshot; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionModules; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests related to {@link SnapshotCodec} implementations. + */ +public class SnapshotCodecTest { + @ClassRule + public static TemporaryFolder tmpDir = new TemporaryFolder(); + + @Test + public void testMinimalDeserilization() throws Exception { + long now = System.currentTimeMillis(); + long nowWritePointer = now * TxConstants.MAX_TX_PER_MS; + /* + * Snapshot consisting of transactions at: + */ + long tInvalid = nowWritePointer - 5; // t1 - invalid + long readPtr = nowWritePointer - 4; // t2 - here and earlier committed + long tLong = nowWritePointer - 3; // t3 - in-progress LONG + long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2) + long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4) + + TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of( + tLong, new TransactionManager.InProgressTx(readPtr, + TransactionManager.getTxExpirationFromWritePointer( + tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT), + TransactionType.LONG), + tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT))); + + TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer, + Lists.newArrayList(tInvalid), // invalid + inProgress, ImmutableMap.<Long, Set<ChangeId>>of( + tShort, Sets.<ChangeId>newHashSet()), + ImmutableMap.<Long, Set<ChangeId>>of( + tCommitted, Sets.<ChangeId>newHashSet())); + + Configuration conf1 = new Configuration(); + conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName()); + SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1); + + byte[] byteArray; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + provider1.encode(out, snapshot); + byteArray = out.toByteArray(); + } + + // TransactionSnapshot and TransactionVisibilityState decode should pass now + TransactionSnapshot txSnapshot = provider1.decode(new ByteArrayInputStream(byteArray)); + TransactionVisibilityState txVisibilityState = + provider1.decodeTransactionVisibilityState(new ByteArrayInputStream(byteArray)); + assertTransactionVisibilityStateEquals(txSnapshot, txVisibilityState); + + // Corrupt the serialization byte array so that full deserialization will fail + byteArray[byteArray.length - 1] = 'a'; + + // TransactionVisibilityState decoding should pass since it doesn't decode the committing and committed changesets. + TransactionVisibilityState txVisibilityState2 = provider1.decodeTransactionVisibilityState( + new ByteArrayInputStream(byteArray)); + Assert.assertNotNull(txVisibilityState2); + Assert.assertEquals(txVisibilityState, txVisibilityState2); + Assert.assertEquals(readPtr, txVisibilityState2.getReadPointer()); + try { + provider1.decode(new ByteArrayInputStream(byteArray)); + Assert.fail(); + } catch (RuntimeException e) { + // expected since we modified the serialization bytes + } + } + + /** + * In-progress LONG transactions written with DefaultSnapshotCodec will not have the type serialized as part of + * the data. Since these transactions also contain a non-negative expiration, we need to ensure we reset the type + * correctly when the snapshot is loaded. + */ + @Test + public void testDefaultToV3Compatibility() throws Exception { + long now = System.currentTimeMillis(); + long nowWritePointer = now * TxConstants.MAX_TX_PER_MS; + /* + * Snapshot consisting of transactions at: + */ + long tInvalid = nowWritePointer - 5; // t1 - invalid + long readPtr = nowWritePointer - 4; // t2 - here and earlier committed + long tLong = nowWritePointer - 3; // t3 - in-progress LONG + long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2) + long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4) + + TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of( + tLong, new TransactionManager.InProgressTx(readPtr, + TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT), + TransactionType.LONG), + tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT))); + + TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer, + Lists.newArrayList(tInvalid), // invalid + inProgress, + ImmutableMap.<Long, Set<ChangeId>>of( + tShort, Sets.newHashSet(new ChangeId(new byte[]{'r', '3'}), new ChangeId(new byte[]{'r', '4'}))), + ImmutableMap.<Long, Set<ChangeId>>of( + tCommitted, Sets.newHashSet(new ChangeId(new byte[]{'r', '1'}), new ChangeId(new byte[]{'r', '2'})))); + + Configuration conf1 = new Configuration(); + conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); + SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + provider1.encode(out, snapshot); + } finally { + out.close(); + } + + TransactionSnapshot snapshot2 = provider1.decode(new ByteArrayInputStream(out.toByteArray())); + TransactionVisibilityState minTxSnapshot = provider1.decodeTransactionVisibilityState( + new ByteArrayInputStream(out.toByteArray())); + assertTransactionVisibilityStateEquals(snapshot2, minTxSnapshot); + + assertEquals(snapshot.getReadPointer(), snapshot2.getReadPointer()); + assertEquals(snapshot.getWritePointer(), snapshot2.getWritePointer()); + assertEquals(snapshot.getInvalid(), snapshot2.getInvalid()); + // in-progress transactions will have missing types + assertNotEquals(snapshot.getInProgress(), snapshot2.getInProgress()); + assertEquals(snapshot.getCommittingChangeSets(), snapshot2.getCommittingChangeSets()); + assertEquals(snapshot.getCommittedChangeSets(), snapshot2.getCommittedChangeSets()); + + // after fixing in-progress, full snapshot should match + Map<Long, TransactionManager.InProgressTx> fixedInProgress = TransactionManager.txnBackwardsCompatCheck( + TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT, 10000L, snapshot2.getInProgress()); + assertEquals(snapshot.getInProgress(), fixedInProgress); + assertEquals(snapshot, snapshot2); + } + + /** + * Test full stack serialization for a TransactionManager migrating from DefaultSnapshotCodec to SnapshotCodecV3. + */ + @Test + public void testDefaultToV3Migration() throws Exception { + File testDir = tmpDir.newFolder("testDefaultToV3Migration"); + Configuration conf = new Configuration(); + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath()); + + Injector injector = Guice.createInjector(new ConfigModule(conf), + new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules()); + + TransactionManager txManager = injector.getInstance(TransactionManager.class); + txManager.startAndWait(); + + txManager.startLong(); + + // shutdown to force a snapshot + txManager.stopAndWait(); + + TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class); + txStorage.startAndWait(); + + // confirm that the in-progress entry is missing a type + TransactionSnapshot snapshot = txStorage.getLatestSnapshot(); + TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState(); + assertTransactionVisibilityStateEquals(snapshot, txVisibilityState); + assertNotNull(snapshot); + assertEquals(1, snapshot.getInProgress().size()); + Map.Entry<Long, TransactionManager.InProgressTx> entry = + snapshot.getInProgress().entrySet().iterator().next(); + assertNull(entry.getValue().getType()); + txStorage.stopAndWait(); + + + // start a new Tx manager to test fixup + Configuration conf2 = new Configuration(); + conf2.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath()); + conf2.setStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, + DefaultSnapshotCodec.class.getName(), SnapshotCodecV3.class.getName()); + Injector injector2 = Guice.createInjector(new ConfigModule(conf2), + new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules()); + + TransactionManager txManager2 = injector2.getInstance(TransactionManager.class); + txManager2.startAndWait(); + + // state should be recovered + TransactionSnapshot snapshot2 = txManager2.getCurrentState(); + assertEquals(1, snapshot2.getInProgress().size()); + Map.Entry<Long, TransactionManager.InProgressTx> inProgressTx = + snapshot2.getInProgress().entrySet().iterator().next(); + assertEquals(TransactionType.LONG, inProgressTx.getValue().getType()); + + // save a new snapshot + txManager2.stopAndWait(); + + TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class); + txStorage2.startAndWait(); + + TransactionSnapshot snapshot3 = txStorage2.getLatestSnapshot(); + // full snapshot should have deserialized correctly without any fixups + assertEquals(snapshot2.getInProgress(), snapshot3.getInProgress()); + assertEquals(snapshot2, snapshot3); + txStorage2.stopAndWait(); + } + + @Test + public void testSnapshotCodecProviderConfiguration() throws Exception { + Configuration conf = new Configuration(false); + StringBuilder buf = new StringBuilder(); + for (Class c : TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES) { + if (buf.length() > 0) { + buf.append(",\n "); + } + buf.append(c.getName()); + } + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, buf.toString()); + + SnapshotCodecProvider codecProvider = new SnapshotCodecProvider(conf); + SnapshotCodec v1codec = codecProvider.getCodecForVersion(new DefaultSnapshotCodec().getVersion()); + assertNotNull(v1codec); + assertTrue(v1codec instanceof DefaultSnapshotCodec); + + SnapshotCodec v2codec = codecProvider.getCodecForVersion(new SnapshotCodecV2().getVersion()); + assertNotNull(v2codec); + assertTrue(v2codec instanceof SnapshotCodecV2); + + SnapshotCodec v3codec = codecProvider.getCodecForVersion(new SnapshotCodecV3().getVersion()); + assertNotNull(v3codec); + assertTrue(v3codec instanceof SnapshotCodecV3); + + SnapshotCodec v4codec = codecProvider.getCodecForVersion(new SnapshotCodecV4().getVersion()); + assertNotNull(v4codec); + assertTrue(v4codec instanceof SnapshotCodecV4); + } + + @Test + public void testSnapshotCodecV4() throws IOException, TransactionNotInProgressException { + File testDir = tmpDir.newFolder("testSnapshotCodecV4"); + Configuration conf = new Configuration(); + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName()); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath()); + + Injector injector = Guice.createInjector(new ConfigModule(conf), + new DiscoveryModules().getSingleNodeModules(), + new TransactionModules().getSingleNodeModules()); + + TransactionManager txManager = injector.getInstance(TransactionManager.class); + txManager.startAndWait(); + + // Create a transaction and a checkpoint transaction + Transaction transaction = txManager.startLong(); + Transaction checkpointTx = txManager.checkpoint(transaction); + + // shutdown to force a snapshot + txManager.stopAndWait(); + + // Validate the snapshot on disk + TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class); + txStorage.startAndWait(); + + TransactionSnapshot snapshot = txStorage.getLatestSnapshot(); + TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState(); + assertTransactionVisibilityStateEquals(snapshot, txVisibilityState); + + Map<Long, TransactionManager.InProgressTx> inProgress = snapshot.getInProgress(); + Assert.assertEquals(1, inProgress.size()); + + TransactionManager.InProgressTx inProgressTx = inProgress.get(transaction.getTransactionId()); + Assert.assertNotNull(inProgressTx); + Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(), + inProgressTx.getCheckpointWritePointers().toLongArray()); + + txStorage.stopAndWait(); + + // start a new Tx manager to see if the transaction is restored correctly. + Injector injector2 = Guice.createInjector(new ConfigModule(conf), + new DiscoveryModules().getSingleNodeModules(), + new TransactionModules().getSingleNodeModules()); + + txManager = injector2.getInstance(TransactionManager.class); + txManager.startAndWait(); + + // state should be recovered + snapshot = txManager.getCurrentState(); + inProgress = snapshot.getInProgress(); + Assert.assertEquals(1, inProgress.size()); + + inProgressTx = inProgress.get(transaction.getTransactionId()); + Assert.assertNotNull(inProgressTx); + Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(), + inProgressTx.getCheckpointWritePointers().toLongArray()); + + // Should be able to commit the transaction + Assert.assertTrue(txManager.canCommit(checkpointTx, Collections.<byte[]>emptyList())); + Assert.assertTrue(txManager.commit(checkpointTx)); + + // save a new snapshot + txManager.stopAndWait(); + + TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class); + txStorage2.startAndWait(); + + snapshot = txStorage2.getLatestSnapshot(); + Assert.assertTrue(snapshot.getInProgress().isEmpty()); + txStorage2.stopAndWait(); + } + + private void assertTransactionVisibilityStateEquals(TransactionVisibilityState expected, + TransactionVisibilityState input) { + Assert.assertEquals(expected.getTimestamp(), input.getTimestamp()); + Assert.assertEquals(expected.getReadPointer(), input.getReadPointer()); + Assert.assertEquals(expected.getWritePointer(), input.getWritePointer()); + Assert.assertEquals(expected.getInProgress(), input.getInProgress()); + Assert.assertEquals(expected.getInvalid(), input.getInvalid()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/org/apache/tephra/util/AbstractConfigurationProviderTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/util/AbstractConfigurationProviderTest.java b/tephra-core/src/test/java/org/apache/tephra/util/AbstractConfigurationProviderTest.java new file mode 100644 index 0000000..8526b75 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/util/AbstractConfigurationProviderTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.util; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * + */ +public abstract class AbstractConfigurationProviderTest { + @Test + public void testVersionFactory() { + HBaseVersion.Version foundVersion = HBaseVersion.get(); + assertEquals(getExpectedVersion(), foundVersion); + } + + protected abstract HBaseVersion.Version getExpectedVersion(); + + @Test + public void testConfigurationProvider() { + Configuration conf = new Configuration(); + conf.set("foo", "bar"); + Configuration newConf = new ConfigurationFactory().get(conf); + assertNotNull(newConf); + assertEquals("bar", newConf.get("foo")); + } +}
