xkrogen commented on code in PR #4744:
URL: https://github.com/apache/hadoop/pull/4744#discussion_r955294947
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java:
##########
@@ -197,10 +197,9 @@ public void testMismatchedNNIsRejected() throws Exception {
.manageNameDfsDirs(false).format(false).checkExitOnShutdown(false)
.build();
fail("New NN with different namespace should have been rejected");
- } catch (ExitException ee) {
+ } catch (IOException ie) {
GenericTestUtils.assertExceptionContains(
- "Unable to start log segment 1: too few journals", ee);
- assertTrue("Didn't terminate properly ", ExitUtil.terminateCalled());
+ "recoverUnfinalizedSegments failed for too many journals", ie);
Review Comment:
I wonder if we should modify the caller to catch the `IOException` and
rethrow as `ExitException` to match previous behavior?
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java:
##########
@@ -1657,16 +1657,11 @@ synchronized void logEdit(final int length, final
byte[] data) {
/**
* Run recovery on all journals to recover any unclosed segments
*/
- synchronized void recoverUnclosedStreams() {
+ synchronized void recoverUnclosedStreams() throws IOException {
Preconditions.checkState(
state == State.BETWEEN_LOG_SEGMENTS,
"May not recover segments - wrong state: %s", state);
- try {
- journalSet.recoverUnfinalizedSegments();
- } catch (IOException ex) {
- // All journals have failed, it is handled in logSync.
- // TODO: are we sure this is OK?
- }
+ journalSet.recoverUnfinalizedSegments();
Review Comment:
This looks right to me as we've been discussing, but I would appreciate
another pair of eyes on it to see if I'm missing anything. @omalley can you
take a look? (see discussion above on why we're making this change)
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java:
##########
@@ -299,33 +299,28 @@ public void catchupDuringFailover() throws IOException {
// Important to do tailing as the login user, in case the shared
// edits storage is implemented by a JournalManager that depends
// on security credentials to access the logs (eg QuorumJournalManager).
- SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- long editsTailed = 0;
- // Fully tail the journal to the end
- do {
- long startTime = timer.monotonicNow();
- try {
- NameNode.getNameNodeMetrics().addEditLogTailInterval(
- startTime - lastLoadTimeMs);
- // It is already under the name system lock and the checkpointer
- // thread is already stopped. No need to acquire any other lock.
- editsTailed = doTailEdits();
- } catch (InterruptedException e) {
- throw new IOException(e);
- } finally {
- NameNode.getNameNodeMetrics().addEditLogTailTime(
- timer.monotonicNow() - startTime);
- }
- } while(editsTailed > 0);
- return null;
+ SecurityUtil.doAsLoginUser((PrivilegedExceptionAction<Void>) () -> {
+ long startTime = timer.monotonicNow();
+ try {
+ NameNode.getNameNodeMetrics().addEditLogTailInterval((startTime -
lastLoadTimeMs));
Review Comment:
why did you remove the do-while loop?
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/SpyQJournalUtil.java:
##########
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import org.apache.hadoop.conf.Configuration;
+import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.eq;
+
+/**
+ * One Util class to mock QJuournals for some UTs not in this package.
+ */
+public final class SpyQJournalUtil {
+
+ private SpyQJournalUtil() {
+ }
+
+ /**
+ * Mock a QuorumJournalManager with input uri, nsInfo and namServiceId.
+ * @param conf input configuration.
+ * @param uri input uri.
+ * @param nsInfo input nameservice info.
+ * @param nameServiceId input nameservice Id.
+ * @return one mocked QuorumJournalManager.
+ * @throws IOException throw IOException.
+ */
+ public static QuorumJournalManager createSpyingQJM(Configuration conf,
+ URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException {
+ AsyncLogger.Factory spyFactory = (conf1, nsInfo1, journalId1,
nameServiceId1, addr1) -> {
+ AsyncLogger logger = new IPCLoggerChannel(conf1, nsInfo1, journalId1,
nameServiceId1, addr1);
+ return Mockito.spy(logger);
+ };
+ return new QuorumJournalManager(conf, uri, nsInfo, nameServiceId,
spyFactory);
+ }
Review Comment:
This looks mostly copy-pasted from
`TestQuorumJournalManager#createSpyingQJM()`, can we change
`TestQuorumJournalManager` to use this utility to reduce duplication?
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHAWithInProgressTail.java:
##########
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.qjournal.client.SpyQJournalUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static
org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getFileInfo;
+import static org.junit.Assert.assertNotNull;
+
+public class TestHAWithInProgressTail {
+ private MiniQJMHACluster qjmhaCluster;
+ private MiniDFSCluster cluster;
+ private MiniJournalCluster jnCluster;
+ private NameNode nn0;
+ private NameNode nn1;
+
+ @Before
+ public void startUp() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+ conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
500);
+ HAUtil.setAllowStandbyReads(conf, true);
+ qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
+ cluster = qjmhaCluster.getDfsCluster();
+ jnCluster = qjmhaCluster.getJournalCluster();
+
+ // Get NameNode from cluster to future manual control
+ nn0 = cluster.getNameNode(0);
+ nn1 = cluster.getNameNode(1);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (qjmhaCluster != null) {
+ qjmhaCluster.shutdown();
+ }
+ }
+
+
+ /**
+ * Test that Standby Node tails multiple segments while catching up
+ * during the transition to Active.
+ */
+ @Test
+ public void testFailoverWithAbnormalJN() throws Exception {
+ cluster.transitionToActive(0);
+ cluster.waitActive(0);
+
+ BlockManagerFaultInjector.instance = new BlockManagerFaultInjector() {
+ @Override
+ public void mockJNStreams() throws IOException {
+ spyOnJASjournal();
+ }
+ };
Review Comment:
Instead of having to add a new `BlockManagerFaultInjector` method, can we
follow more like the example of `TestQuorumJournalManager` where we set up a
spy that allows the write calls to pass through, then we mock what we need?
We would replace this part with something like:
```
JournalSet.JournalAndStream jas = nn1.getNamesystem().getEditLogTailer()
.getEditLog().getJournalSet().getAllJournalStreams().get(0);
JournalManager oldManager = jas.getManager();
jas.setJournalForTests(SpyQJournalUtil.createSpyingQJM(....));
```
Or am I missing something for why we have to initialize the spy _after_
`recoverUnfinalizedSegments()` instead of before the failover?
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java:
##########
@@ -299,33 +299,28 @@ public void catchupDuringFailover() throws IOException {
// Important to do tailing as the login user, in case the shared
// edits storage is implemented by a JournalManager that depends
// on security credentials to access the logs (eg QuorumJournalManager).
- SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- long editsTailed = 0;
- // Fully tail the journal to the end
- do {
- long startTime = timer.monotonicNow();
- try {
- NameNode.getNameNodeMetrics().addEditLogTailInterval(
- startTime - lastLoadTimeMs);
- // It is already under the name system lock and the checkpointer
- // thread is already stopped. No need to acquire any other lock.
- editsTailed = doTailEdits();
- } catch (InterruptedException e) {
- throw new IOException(e);
- } finally {
- NameNode.getNameNodeMetrics().addEditLogTailTime(
- timer.monotonicNow() - startTime);
- }
- } while(editsTailed > 0);
- return null;
+ SecurityUtil.doAsLoginUser((PrivilegedExceptionAction<Void>) () -> {
+ long startTime = timer.monotonicNow();
+ try {
+ NameNode.getNameNodeMetrics().addEditLogTailInterval((startTime -
lastLoadTimeMs));
+ // It is already under the name system lock and the checkpointer
+ // thread is already stopped. No need to acquire any other lock.
+ doTailEdits(false);
Review Comment:
please add a comment explaining why we disable in-progress edits here
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHAWithInProgressTail.java:
##########
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.qjournal.client.SpyQJournalUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static
org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getFileInfo;
+import static org.junit.Assert.assertNotNull;
+
+public class TestHAWithInProgressTail {
+ private MiniQJMHACluster qjmhaCluster;
+ private MiniDFSCluster cluster;
+ private MiniJournalCluster jnCluster;
+ private NameNode nn0;
+ private NameNode nn1;
+
+ @Before
+ public void startUp() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+ conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
500);
+ HAUtil.setAllowStandbyReads(conf, true);
+ qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
+ cluster = qjmhaCluster.getDfsCluster();
+ jnCluster = qjmhaCluster.getJournalCluster();
+
+ // Get NameNode from cluster to future manual control
+ nn0 = cluster.getNameNode(0);
+ nn1 = cluster.getNameNode(1);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (qjmhaCluster != null) {
+ qjmhaCluster.shutdown();
+ }
+ }
+
+
+ /**
+ * Test that Standby Node tails multiple segments while catching up
+ * during the transition to Active.
+ */
+ @Test
+ public void testFailoverWithAbnormalJN() throws Exception {
+ cluster.transitionToActive(0);
+ cluster.waitActive(0);
+
+ BlockManagerFaultInjector.instance = new BlockManagerFaultInjector() {
+ @Override
+ public void mockJNStreams() throws IOException {
+ spyOnJASjournal();
+ }
+ };
+
+ // Stop EditlogTailer in Standby NameNode.
+ cluster.getNameNode(1).getNamesystem().getEditLogTailer().stop();
+
+ String p = "/testFailoverWhileTailingWithoutCache/";
+ mkdirs(nn0, p + 0, p + 1, p + 2, p + 3, p + 4);
+ mkdirs(nn0, p + 5, p + 6, p + 7, p + 8, p + 9);
+ mkdirs(nn0, p + 10, p + 11, p + 12, p + 13, p + 14);
+
+ cluster.transitionToStandby(0);
+
+ cluster.transitionToActive(1);
+
+ // we should read them in nn1.
+ waitForFileInfo(nn1, p + 0, p + 1, p + 14);
+ }
+
+ private void spyOnJASjournal() throws IOException {
+ JournalSet.JournalAndStream jas = nn1.getNamesystem().getEditLogTailer()
+ .getEditLog().getJournalSet().getAllJournalStreams().get(0);
+
+ JournalManager oldManager = jas.getManager();
+ oldManager.close();
+
+ // Create a SpyingQJM
+ QuorumJournalManager manager =
SpyQJournalUtil.createSpyingQJM(nn1.getConf(),
+ jnCluster.getQuorumJournalURI("ns1"),
+ nn1.getNamesystem().getNamespaceInfo(), "ns1");
+ manager.recoverUnfinalizedSegments();
+ jas.setJournalForTests(manager);
+
+ // First JournalNode with an empty response.
+ SpyQJournalUtil.mockOneJNReturnEmptyResponse(manager, 1L, 0);
+ // Second JournalNode with a slow response.
+ SpyQJournalUtil.mockOneJNWithSlowResponse(manager, 1L, 3000, 1);
+ }
+
+ /**
+ * Create the given directories on the provided NameNode.
+ */
+ private static void mkdirs(NameNode nameNode, String... dirNames)
+ throws Exception {
+ for (String dirName : dirNames) {
+ nameNode.getRpcServer().mkdirs(dirName,
+ FsPermission.createImmutable((short) 0755), true);
+ }
+ }
+
+ /**
+ * Wait up to 1 second until the given NameNode is aware of the existing of
+ * all of the provided fileNames.
+ */
+ private static void waitForFileInfo(NameNode nn, String... fileNames)
+ throws Exception {
Review Comment:
name/Javadoc doesn't match the impl -- there is no waiting here
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHAWithInProgressTail.java:
##########
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.qjournal.client.SpyQJournalUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static
org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getFileInfo;
+import static org.junit.Assert.assertNotNull;
+
+public class TestHAWithInProgressTail {
+ private MiniQJMHACluster qjmhaCluster;
+ private MiniDFSCluster cluster;
+ private MiniJournalCluster jnCluster;
+ private NameNode nn0;
+ private NameNode nn1;
+
+ @Before
+ public void startUp() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+ conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
500);
+ HAUtil.setAllowStandbyReads(conf, true);
+ qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
+ cluster = qjmhaCluster.getDfsCluster();
+ jnCluster = qjmhaCluster.getJournalCluster();
+
+ // Get NameNode from cluster to future manual control
+ nn0 = cluster.getNameNode(0);
+ nn1 = cluster.getNameNode(1);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (qjmhaCluster != null) {
+ qjmhaCluster.shutdown();
+ }
+ }
+
+
+ /**
+ * Test that Standby Node tails multiple segments while catching up
+ * during the transition to Active.
+ */
+ @Test
+ public void testFailoverWithAbnormalJN() throws Exception {
+ cluster.transitionToActive(0);
+ cluster.waitActive(0);
+
+ BlockManagerFaultInjector.instance = new BlockManagerFaultInjector() {
+ @Override
+ public void mockJNStreams() throws IOException {
+ spyOnJASjournal();
+ }
+ };
+
+ // Stop EditlogTailer in Standby NameNode.
+ cluster.getNameNode(1).getNamesystem().getEditLogTailer().stop();
+
+ String p = "/testFailoverWhileTailingWithoutCache/";
+ mkdirs(nn0, p + 0, p + 1, p + 2, p + 3, p + 4);
+ mkdirs(nn0, p + 5, p + 6, p + 7, p + 8, p + 9);
+ mkdirs(nn0, p + 10, p + 11, p + 12, p + 13, p + 14);
Review Comment:
why so many directories? just 1 should be sufficient, right?
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/SpyQJournalUtil.java:
##########
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import org.apache.hadoop.conf.Configuration;
+import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.eq;
+
+/**
+ * One Util class to mock QJuournals for some UTs not in this package.
Review Comment:
typo: `QJuournals` (maybe just say `QuorumJournalManager` or QJM)
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/SpyQJournalUtil.java:
##########
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import org.apache.hadoop.conf.Configuration;
+import
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.eq;
+
+/**
+ * One Util class to mock QJuournals for some UTs not in this package.
+ */
+public final class SpyQJournalUtil {
+
+ private SpyQJournalUtil() {
+ }
+
+ /**
+ * Mock a QuorumJournalManager with input uri, nsInfo and namServiceId.
+ * @param conf input configuration.
+ * @param uri input uri.
+ * @param nsInfo input nameservice info.
+ * @param nameServiceId input nameservice Id.
+ * @return one mocked QuorumJournalManager.
+ * @throws IOException throw IOException.
+ */
+ public static QuorumJournalManager createSpyingQJM(Configuration conf,
+ URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException {
+ AsyncLogger.Factory spyFactory = (conf1, nsInfo1, journalId1,
nameServiceId1, addr1) -> {
+ AsyncLogger logger = new IPCLoggerChannel(conf1, nsInfo1, journalId1,
nameServiceId1, addr1);
+ return Mockito.spy(logger);
+ };
+ return new QuorumJournalManager(conf, uri, nsInfo, nameServiceId,
spyFactory);
+ }
+
+ /**
+ * Try to mock one abnormal JournalNode with one empty response
+ * for getJournaledEdits rpc with startTxid.
+ * @param manager QuorumJournalmanager.
+ * @param startTxid input StartTxid.
+ */
+ public static void mockOneJNReturnEmptyResponse(
+ QuorumJournalManager manager, long startTxid, int journalIndex) {
+ List<AsyncLogger> spies =
manager.getLoggerSetForTests().getLoggersForTests();
+
+ // Mock JN0 return an empty response.
+ GetJournaledEditsResponseProto responseProto =
GetJournaledEditsResponseProto
+ .newBuilder().setTxnCount(journalIndex).build();
+ ListenableFuture<GetJournaledEditsResponseProto> ret =
Futures.immediateFuture(responseProto);
+ Mockito.doReturn(ret).when(spies.get(journalIndex))
+ .getJournaledEdits(eq(startTxid),
eq(QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT));
+ }
+
+ /**
+ * Try to mock one abnormal JournalNode with slow response for
+ * getJournaledEdits rpc with startTxid.
+ * @param manager input QuormJournalManager.
+ * @param startTxid input start txid.
+ * @param sleepTime sleep time.
+ * @param journalIndex the journal index need to be mocked.
+ */
+ public static void mockOneJNWithSlowResponse(
+ QuorumJournalManager manager, long startTxid, int sleepTime, int
journalIndex) {
+ List<AsyncLogger> spies =
manager.getLoggerSetForTests().getLoggersForTests();
+
+ ListeningExecutorService service = MoreExecutors.listeningDecorator(
+ Executors.newSingleThreadExecutor());
+ Mockito.doAnswer(invocation -> service.submit(() -> {
+ Thread.sleep(sleepTime);
Review Comment:
Same [comment as on
HDFS-16659](https://github.com/apache/hadoop/pull/4560#discussion_r955289672),
we shouldn't rely on timing, better to use proper synchronization
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]