HDFS-12224. Add tests to TestJournalNodeSync for sync after JN downtime. Contributed by Hanisha Koneru.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bbc6d254 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bbc6d254 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bbc6d254 Branch: refs/heads/HADOOP-13345 Commit: bbc6d254c8a953abba69415d80edeede3ee6269d Parents: fe33417 Author: Arpit Agarwal <a...@apache.org> Authored: Fri Aug 4 12:51:33 2017 -0700 Committer: Arpit Agarwal <a...@apache.org> Committed: Fri Aug 4 12:51:33 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hdfs/qjournal/server/Journal.java | 3 +- .../hdfs/qjournal/server/JournalMetrics.java | 11 + .../hdfs/qjournal/server/JournalNodeSyncer.java | 4 + .../hdfs/qjournal/TestJournalNodeSync.java | 265 ----------- .../hdfs/qjournal/server/TestJournalNode.java | 6 +- .../qjournal/server/TestJournalNodeSync.java | 439 +++++++++++++++++++ 6 files changed, 458 insertions(+), 270 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 0041d5e..0f4091d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -286,8 +286,7 @@ public class Journal implements Closeable { fjm.setLastReadableTxId(val); } - @VisibleForTesting - JournalMetrics getMetricsForTests() { + JournalMetrics getMetrics() { return metrics; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java index cffe2c1..fcfd901 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java @@ -45,6 +45,9 @@ class JournalMetrics { @Metric("Number of batches written where this node was lagging") MutableCounterLong batchesWrittenWhileLagging; + + @Metric("Number of edit logs downloaded by JournalNodeSyncer") + private MutableCounterLong numEditLogsSynced; private final int[] QUANTILE_INTERVALS = new int[] { 1*60, // 1m @@ -120,4 +123,12 @@ class JournalMetrics { q.add(us); } } + + public MutableCounterLong getNumEditLogsSynced() { + return numEditLogsSynced; + } + + public void incrNumEditLogsSynced() { + numEditLogsSynced.incr(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index 479f6a0..537ba0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -77,6 +77,7 @@ public class JournalNodeSyncer { private final long journalSyncInterval; private final int logSegmentTransferTimeout; private final DataTransferThrottler throttler; + private final JournalMetrics metrics; JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid, Configuration conf) { @@ -93,6 +94,7 @@ public class JournalNodeSyncer { DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY, DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT); throttler = getThrottler(conf); + metrics = journal.getMetrics(); } void stopSync() { @@ -411,6 +413,8 @@ public class JournalNodeSyncer { LOG.warn("Deleting " + tmpEditsFile + " has failed"); } return false; + } else { + metrics.incrNumEditLogsSynced(); } return true; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java deleted file mode 100644 index 8415a6f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.qjournal; - -import com.google.common.base.Supplier; -import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; -import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager - .getLogFile; - -import org.apache.hadoop.test.GenericTestUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Random; - -/** - * Unit test for Journal Node formatting upon re-installation and syncing. - */ -public class TestJournalNodeSync { - private MiniQJMHACluster qjmhaCluster; - private MiniDFSCluster dfsCluster; - private MiniJournalCluster jCluster; - private FileSystem fs; - private FSNamesystem namesystem; - private int editsPerformed = 0; - private final String jid = "ns1"; - - @Before - public void setUpMiniCluster() throws IOException { - final Configuration conf = new HdfsConfiguration(); - conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true); - conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L); - qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2) - .build(); - dfsCluster = qjmhaCluster.getDfsCluster(); - jCluster = qjmhaCluster.getJournalCluster(); - - dfsCluster.transitionToActive(0); - fs = dfsCluster.getFileSystem(0); - namesystem = dfsCluster.getNamesystem(0); - } - - @After - public void shutDownMiniCluster() throws IOException { - if (qjmhaCluster != null) { - qjmhaCluster.shutdown(); - } - } - - @Test(timeout=30000) - public void testJournalNodeSync() throws Exception { - File firstJournalDir = jCluster.getJournalDir(0, jid); - File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) - .getCurrentDir(); - - // Generate some edit logs and delete one. - long firstTxId = generateEditLog(); - generateEditLog(); - - File missingLog = deleteEditLog(firstJournalCurrentDir, firstTxId); - - GenericTestUtils.waitFor(editLogExists(Lists.newArrayList(missingLog)), - 500, 10000); - } - - @Test(timeout=30000) - public void testSyncForMultipleMissingLogs() throws Exception { - File firstJournalDir = jCluster.getJournalDir(0, jid); - File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) - .getCurrentDir(); - - // Generate some edit logs and delete two. - long firstTxId = generateEditLog(); - long nextTxId = generateEditLog(); - - List<File> missingLogs = Lists.newArrayList(); - missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); - missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId)); - - GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000); - } - - @Test(timeout=30000) - public void testSyncForDiscontinuousMissingLogs() throws Exception { - File firstJournalDir = jCluster.getJournalDir(0, jid); - File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) - .getCurrentDir(); - - // Generate some edit logs and delete two discontinuous logs. - long firstTxId = generateEditLog(); - generateEditLog(); - long nextTxId = generateEditLog(); - - List<File> missingLogs = Lists.newArrayList(); - missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); - missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId)); - - GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000); - } - - @Test(timeout=30000) - public void testMultipleJournalsMissingLogs() throws Exception { - File firstJournalDir = jCluster.getJournalDir(0, jid); - File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) - .getCurrentDir(); - - File secondJournalDir = jCluster.getJournalDir(1, jid); - StorageDirectory sd = new StorageDirectory(secondJournalDir); - File secondJournalCurrentDir = sd.getCurrentDir(); - - // Generate some edit logs and delete one log from two journals. - long firstTxId = generateEditLog(); - generateEditLog(); - - List<File> missingLogs = Lists.newArrayList(); - missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); - missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId)); - - GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000); - } - - @Test(timeout=60000) - public void testMultipleJournalsMultipleMissingLogs() throws Exception { - File firstJournalDir = jCluster.getJournalDir(0, jid); - File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) - .getCurrentDir(); - - File secondJournalDir = jCluster.getJournalDir(1, jid); - File secondJournalCurrentDir = new StorageDirectory(secondJournalDir) - .getCurrentDir(); - - File thirdJournalDir = jCluster.getJournalDir(2, jid); - File thirdJournalCurrentDir = new StorageDirectory(thirdJournalDir) - .getCurrentDir(); - - // Generate some edit logs and delete multiple logs in multiple journals. - long firstTxId = generateEditLog(); - long secondTxId = generateEditLog(); - long thirdTxId = generateEditLog(); - - List<File> missingLogs = Lists.newArrayList(); - missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); - missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId)); - missingLogs.add(deleteEditLog(secondJournalCurrentDir, secondTxId)); - missingLogs.add(deleteEditLog(thirdJournalCurrentDir, thirdTxId)); - - GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); - } - - // Test JournalNode Sync by randomly deleting edit logs from one or two of - // the journals. - @Test(timeout=60000) - public void testRandomJournalMissingLogs() throws Exception { - Random randomJournal = new Random(); - - List<File> journalCurrentDirs = Lists.newArrayList(); - - for (int i = 0; i < 3; i++) { - journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i, - jid)).getCurrentDir()); - } - - int count = 0; - long lastStartTxId; - int journalIndex; - List<File> missingLogs = Lists.newArrayList(); - while (count < 5) { - lastStartTxId = generateEditLog(); - - // Delete the last edit log segment from randomly selected journal node - journalIndex = randomJournal.nextInt(3); - missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex), - lastStartTxId)); - - // Delete the last edit log segment from two journals for some logs - if (count % 2 == 0) { - journalIndex = (journalIndex + 1) % 3; - missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex), - lastStartTxId)); - } - - count++; - } - - GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); - } - - private File deleteEditLog(File currentDir, long startTxId) - throws IOException { - EditLogFile logFile = getLogFile(currentDir, startTxId); - while (logFile.isInProgress()) { - dfsCluster.getNameNode(0).getRpcServer().rollEditLog(); - logFile = getLogFile(currentDir, startTxId); - } - File deleteFile = logFile.getFile(); - Assert.assertTrue("Couldn't delete edit log file", deleteFile.delete()); - - return deleteFile; - } - - /** - * Do a mutative metadata operation on the file system. - * - * @return true if the operation was successful, false otherwise. - */ - private boolean doAnEdit() throws IOException { - return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++))); - } - - /** - * Does an edit and rolls the Edit Log. - * - * @return the startTxId of next segment after rolling edits. - */ - private long generateEditLog() throws IOException { - long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId(); - Assert.assertTrue("Failed to do an edit", doAnEdit()); - dfsCluster.getNameNode(0).getRpcServer().rollEditLog(); - return startTxId; - } - - private Supplier<Boolean> editLogExists(List<File> editLogs) { - Supplier<Boolean> supplier = new Supplier<Boolean>() { - @Override - public Boolean get() { - for (File editLog : editLogs) { - if (!editLog.exists()) { - return false; - } - } - return true; - } - }; - return supplier; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java index 9dd6846..28ec708 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java @@ -102,7 +102,7 @@ public class TestJournalNode { @Test(timeout=100000) public void testJournal() throws Exception { MetricsRecordBuilder metrics = MetricsAsserts.getMetrics( - journal.getMetricsForTests().getName()); + journal.getMetrics().getName()); MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); @@ -117,7 +117,7 @@ public class TestJournalNode { ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get(); metrics = MetricsAsserts.getMetrics( - journal.getMetricsForTests().getName()); + journal.getMetrics().getName()); MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); @@ -130,7 +130,7 @@ public class TestJournalNode { ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get(); metrics = MetricsAsserts.getMetrics( - journal.getMetricsForTests().getName()); + journal.getMetrics().getName()); MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics); MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java new file mode 100644 index 0000000..2964f05 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java @@ -0,0 +1,439 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; +import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager + .getLogFile; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Random; + +/** + * Unit test for Journal Node formatting upon re-installation and syncing. + */ +public class TestJournalNodeSync { + private Configuration conf; + private MiniQJMHACluster qjmhaCluster; + private MiniDFSCluster dfsCluster; + private MiniJournalCluster jCluster; + private FileSystem fs; + private FSNamesystem namesystem; + private int editsPerformed = 0; + private final String jid = "ns1"; + + @Rule + public TestName testName = new TestName(); + + @Before + public void setUpMiniCluster() throws IOException { + conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true); + conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L); + if (testName.getMethodName().equals( + "testSyncAfterJNdowntimeWithoutQJournalQueue")) { + conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0); + } + qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2) + .build(); + dfsCluster = qjmhaCluster.getDfsCluster(); + jCluster = qjmhaCluster.getJournalCluster(); + + dfsCluster.transitionToActive(0); + fs = dfsCluster.getFileSystem(0); + namesystem = dfsCluster.getNamesystem(0); + } + + @After + public void shutDownMiniCluster() throws IOException { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + } + + @Test(timeout=30000) + public void testJournalNodeSync() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + + // Generate some edit logs and delete one. + long firstTxId = generateEditLog(); + generateEditLog(); + + File missingLog = deleteEditLog(firstJournalCurrentDir, firstTxId); + + GenericTestUtils.waitFor(editLogExists(Lists.newArrayList(missingLog)), + 500, 10000); + } + + @Test(timeout=30000) + public void testSyncForMultipleMissingLogs() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + + // Generate some edit logs and delete two. + long firstTxId = generateEditLog(); + long nextTxId = generateEditLog(); + + List<File> missingLogs = Lists.newArrayList(); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId)); + + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000); + } + + @Test(timeout=30000) + public void testSyncForDiscontinuousMissingLogs() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + + // Generate some edit logs and delete two discontinuous logs. + long firstTxId = generateEditLog(); + generateEditLog(); + long nextTxId = generateEditLog(); + + List<File> missingLogs = Lists.newArrayList(); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId)); + + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000); + } + + @Test(timeout=30000) + public void testMultipleJournalsMissingLogs() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + + File secondJournalDir = jCluster.getJournalDir(1, jid); + StorageDirectory sd = new StorageDirectory(secondJournalDir); + File secondJournalCurrentDir = sd.getCurrentDir(); + + // Generate some edit logs and delete one log from two journals. + long firstTxId = generateEditLog(); + generateEditLog(); + + List<File> missingLogs = Lists.newArrayList(); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); + missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId)); + + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000); + } + + @Test(timeout=60000) + public void testMultipleJournalsMultipleMissingLogs() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + + File secondJournalDir = jCluster.getJournalDir(1, jid); + File secondJournalCurrentDir = new StorageDirectory(secondJournalDir) + .getCurrentDir(); + + File thirdJournalDir = jCluster.getJournalDir(2, jid); + File thirdJournalCurrentDir = new StorageDirectory(thirdJournalDir) + .getCurrentDir(); + + // Generate some edit logs and delete multiple logs in multiple journals. + long firstTxId = generateEditLog(); + long secondTxId = generateEditLog(); + long thirdTxId = generateEditLog(); + + List<File> missingLogs = Lists.newArrayList(); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); + missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId)); + missingLogs.add(deleteEditLog(secondJournalCurrentDir, secondTxId)); + missingLogs.add(deleteEditLog(thirdJournalCurrentDir, thirdTxId)); + + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + } + + // Test JournalNode Sync by randomly deleting edit logs from one or two of + // the journals. + @Test(timeout=60000) + public void testRandomJournalMissingLogs() throws Exception { + Random randomJournal = new Random(); + + List<File> journalCurrentDirs = Lists.newArrayList(); + + for (int i = 0; i < 3; i++) { + journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i, + jid)).getCurrentDir()); + } + + int count = 0; + long lastStartTxId; + int journalIndex; + List<File> missingLogs = Lists.newArrayList(); + while (count < 5) { + lastStartTxId = generateEditLog(); + + // Delete the last edit log segment from randomly selected journal node + journalIndex = randomJournal.nextInt(3); + missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex), + lastStartTxId)); + + // Delete the last edit log segment from two journals for some logs + if (count % 2 == 0) { + journalIndex = (journalIndex + 1) % 3; + missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex), + lastStartTxId)); + } + + count++; + } + + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + } + + // Test JournalNode Sync when a JN id down while NN is actively writing + // logs and comes back up after some time. + @Test (timeout=300_000) + public void testSyncAfterJNdowntime() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + File secondJournalDir = jCluster.getJournalDir(1, jid); + File secondJournalCurrentDir = new StorageDirectory(secondJournalDir) + .getCurrentDir(); + + long[] startTxIds = new long[10]; + + startTxIds[0] = generateEditLog(); + startTxIds[1] = generateEditLog(); + + // Stop the first JN + jCluster.getJournalNode(0).stop(0); + + // Roll some more edits while the first JN is down + for (int i = 2; i < 10; i++) { + startTxIds[i] = generateEditLog(5); + } + + // Re-start the first JN + jCluster.restartJournalNode(0); + + // Roll an edit to update the committed tx id of the first JN + generateEditLog(); + + // List the edit logs rolled during JN down time. + List<File> missingLogs = Lists.newArrayList(); + for (int i = 2; i < 10; i++) { + EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i], + false); + missingLogs.add(new File(firstJournalCurrentDir, + logFile.getFile().getName())); + } + + // Check that JNSync downloaded the edit logs rolled during JN down time. + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + } + + /** + * Test JournalNode Sync when a JN id down while NN is actively writing + * logs and comes back up after some time with no edit log queueing. + * Queuing disabled during the cluster setup {@link #setUpMiniCluster()} + * @throws Exception + */ + @Test (timeout=300_000) + public void testSyncAfterJNdowntimeWithoutQJournalQueue() throws Exception{ + // Queuing is disabled during the cluster setup {@link #setUpMiniCluster()} + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + File secondJournalDir = jCluster.getJournalDir(1, jid); + File secondJournalCurrentDir = new StorageDirectory(secondJournalDir) + .getCurrentDir(); + + long[] startTxIds = new long[10]; + + startTxIds[0] = generateEditLog(); + startTxIds[1] = generateEditLog(2); + + // Stop the first JN + jCluster.getJournalNode(0).stop(0); + + // Roll some more edits while the first JN is down + for (int i = 2; i < 10; i++) { + startTxIds[i] = generateEditLog(5); + } + + // Re-start the first JN + jCluster.restartJournalNode(0); + + // After JN restart and before rolling another edit, the missing edit + // logs will not by synced as the committed tx id of the JN will be + // less than the start tx id's of the missing edit logs and edit log queuing + // has been disabled. + // Roll an edit to update the committed tx id of the first JN + generateEditLog(2); + + // List the edit logs rolled during JN down time. + List<File> missingLogs = Lists.newArrayList(); + for (int i = 2; i < 10; i++) { + EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i], + false); + missingLogs.add(new File(firstJournalCurrentDir, + logFile.getFile().getName())); + } + + // Check that JNSync downloaded the edit logs rolled during JN down time. + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + + // Check that all the missing edit logs have been downloaded via + // JournalNodeSyncer alone (as the edit log queueing has been disabled) + long numEditLogsSynced = jCluster.getJournalNode(0).getOrCreateJournal(jid) + .getMetrics().getNumEditLogsSynced().value(); + Assert.assertTrue("Edit logs downloaded outside syncer. Expected 8 or " + + "more downloads, got " + numEditLogsSynced + " downloads instead", + numEditLogsSynced >= 8); + } + + // Test JournalNode Sync when a JN is formatted while NN is actively writing + // logs. + @Test (timeout=300_000) + public void testSyncAfterJNformat() throws Exception{ + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + File secondJournalDir = jCluster.getJournalDir(1, jid); + File secondJournalCurrentDir = new StorageDirectory(secondJournalDir) + .getCurrentDir(); + + long[] startTxIds = new long[10]; + + startTxIds[0] = generateEditLog(1); + startTxIds[1] = generateEditLog(2); + startTxIds[2] = generateEditLog(4); + startTxIds[3] = generateEditLog(6); + + Journal journal1 = jCluster.getJournalNode(0).getOrCreateJournal(jid); + NamespaceInfo nsInfo = journal1.getStorage().getNamespaceInfo(); + + // Delete contents of current directory of one JN + for (File file : firstJournalCurrentDir.listFiles()) { + file.delete(); + } + + // Format the JN + journal1.format(nsInfo); + + // Roll some more edits + for (int i = 4; i < 10; i++) { + startTxIds[i] = generateEditLog(5); + } + + // List the edit logs rolled during JN down time. + List<File> missingLogs = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i], + false); + missingLogs.add(new File(firstJournalCurrentDir, + logFile.getFile().getName())); + } + + // Check that the formatted JN has all the edit logs. + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + } + + private File deleteEditLog(File currentDir, long startTxId) + throws IOException { + EditLogFile logFile = getLogFile(currentDir, startTxId); + while (logFile.isInProgress()) { + dfsCluster.getNameNode(0).getRpcServer().rollEditLog(); + logFile = getLogFile(currentDir, startTxId); + } + File deleteFile = logFile.getFile(); + Assert.assertTrue("Couldn't delete edit log file", deleteFile.delete()); + + return deleteFile; + } + + /** + * Do a mutative metadata operation on the file system. + * + * @return true if the operation was successful, false otherwise. + */ + private boolean doAnEdit() throws IOException { + return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++))); + } + + /** + * Does an edit and rolls the Edit Log. + * + * @return the startTxId of next segment after rolling edits. + */ + private long generateEditLog() throws IOException { + return generateEditLog(1); + } + + /** + * Does specified number of edits and rolls the Edit Log. + * + * @param numEdits number of Edits to perform + * @return the startTxId of next segment after rolling edits. + */ + private long generateEditLog(int numEdits) throws IOException { + long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId(); + for (int i = 1; i <= numEdits; i++) { + Assert.assertTrue("Failed to do an edit", doAnEdit()); + } + dfsCluster.getNameNode(0).getRpcServer().rollEditLog(); + return startTxId; + } + + private Supplier<Boolean> editLogExists(List<File> editLogs) { + Supplier<Boolean> supplier = new Supplier<Boolean>() { + @Override + public Boolean get() { + for (File editLog : editLogs) { + if (!editLog.exists()) { + return false; + } + } + return true; + } + }; + return supplier; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org