xkrogen commented on code in PR #4560:
URL: https://github.com/apache/hadoop/pull/4560#discussion_r955266389


##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java:
##########
@@ -750,10 +750,13 @@ public GetJournaledEditsResponseProto 
getJournaledEdits(long sinceTxId,
           "is a requirement to fetch journaled edits via RPC. Please enable " +
           "it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY);
     }
-    if (sinceTxId > getHighestWrittenTxId()) {
-      // Requested edits that don't exist yet; short-circuit the cache here
-      metrics.rpcEmptyResponses.incr();
-      return 
GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
+    long highestTxId = getHighestWrittenTxId();
+    if (sinceTxId > highestTxId) {
+      // Requested edits that don't exist yet and is newer than highestTxId.
+      throw new NewerTxnIdException(
+          "Highest txn ID available in the journal is %d, but requested txns " 
+
+              "staring at %d. Maybe the journal is not healthy, just skip it.",

Review Comment:
   typo: `staring` -> `starting`
   
   We expect this to happen more frequently when the JNs are healthy than when 
they are unhealthy, right? Maybe we should remove the part about "Maybe the 
journal is not healthy" ?



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java:
##########
@@ -413,5 +413,4 @@ long getCacheMissAmount() {
     }
 
   }
-

Review Comment:
   Can you undo this whitespace change please



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java:
##########
@@ -53,6 +63,8 @@
 import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
 import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import
+    
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;

Review Comment:
   leave on 1 line, we ignore line length limitation for imports



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java:
##########
@@ -1101,6 +1113,59 @@ public void testSelectViaRpcTwoJNsError() throws 
Exception {
     }
   }
 
+  /**
+   * Test selecting EditLogInputStream after some journalNode jitter.
+   * And the corner case as below:
+   * 1. Journal 0 has some abnormal cases when journaling Edits with start 
txId 11.
+   * 2. NameNode just ignore the abnormal journal 0 and continue to write 
Edits to Journal 1 and 2.
+   * 3. Journal 0 backed to health.
+   * 4. Observer NameNode try to select EditLogInputStream vis PRC with start 
txId 21.
+   * 5. Journal 1 has some abnormal cases caused slow response.
+   *
+   * And the expected selecting result is: Response should contain 20 Edits 
from txId 21 to txId 40.
+   * Because there is no Edits from id 21 to 40 in the cache of JournalNode0.
+   */
+  @Test
+  public void testSelectViaRpcAfterJNJitter() throws Exception {

Review Comment:
   This test passes even if I revert your production changes from `Journal`, 
can you check again?



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java:
##########
@@ -1101,6 +1113,59 @@ public void testSelectViaRpcTwoJNsError() throws 
Exception {
     }
   }
 
+  /**
+   * Test selecting EditLogInputStream after some journalNode jitter.
+   * And the corner case as below:
+   * 1. Journal 0 has some abnormal cases when journaling Edits with start 
txId 11.
+   * 2. NameNode just ignore the abnormal journal 0 and continue to write 
Edits to Journal 1 and 2.
+   * 3. Journal 0 backed to health.
+   * 4. Observer NameNode try to select EditLogInputStream vis PRC with start 
txId 21.
+   * 5. Journal 1 has some abnormal cases caused slow response.
+   *
+   * And the expected selecting result is: Response should contain 20 Edits 
from txId 21 to txId 40.
+   * Because there is no Edits from id 21 to 40 in the cache of JournalNode0.
+   */
+  @Test
+  public void testSelectViaRpcAfterJNJitter() throws Exception {
+    EditLogOutputStream stm = qjm.startLogSegment(
+        1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    SettableFuture<Void> slowLog = SettableFuture.create();
+    Mockito.doReturn(slowLog).when(spies.get(0))
+        .sendEdits(eq(1L), eq(11L), eq(1), Mockito.any());
+    writeTxns(stm, 1, 10);
+    writeTxns(stm, 11, 10);
+    writeTxns(stm, 21, 10);
+    writeTxns(stm, 31, 10);
+    ListeningExecutorService service = MoreExecutors.listeningDecorator(
+        Executors.newSingleThreadExecutor());

Review Comment:
   Why do we need `ListenerExecutorService`? Can't we just directly use 
`Executors.newSingleThreadExecutor()`?



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java:
##########
@@ -1101,6 +1113,59 @@ public void testSelectViaRpcTwoJNsError() throws 
Exception {
     }
   }
 
+  /**
+   * Test selecting EditLogInputStream after some journalNode jitter.
+   * And the corner case as below:
+   * 1. Journal 0 has some abnormal cases when journaling Edits with start 
txId 11.
+   * 2. NameNode just ignore the abnormal journal 0 and continue to write 
Edits to Journal 1 and 2.
+   * 3. Journal 0 backed to health.
+   * 4. Observer NameNode try to select EditLogInputStream vis PRC with start 
txId 21.
+   * 5. Journal 1 has some abnormal cases caused slow response.
+   *
+   * And the expected selecting result is: Response should contain 20 Edits 
from txId 21 to txId 40.
+   * Because there is no Edits from id 21 to 40 in the cache of JournalNode0.
+   */
+  @Test
+  public void testSelectViaRpcAfterJNJitter() throws Exception {
+    EditLogOutputStream stm = qjm.startLogSegment(
+        1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    SettableFuture<Void> slowLog = SettableFuture.create();
+    Mockito.doReturn(slowLog).when(spies.get(0))
+        .sendEdits(eq(1L), eq(11L), eq(1), Mockito.any());
+    writeTxns(stm, 1, 10);
+    writeTxns(stm, 11, 10);
+    writeTxns(stm, 21, 10);
+    writeTxns(stm, 31, 10);
+    ListeningExecutorService service = MoreExecutors.listeningDecorator(
+        Executors.newSingleThreadExecutor());
+    Mockito.doAnswer(invocation -> service.submit(
+        () -> {
+          ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+          EditLogFileOutputStream.writeHeader(
+              NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION,
+              new DataOutputStream(byteStream));
+          byteStream.write(createTxnData(21, 20));
+          Thread.sleep(3000);

Review Comment:
   We shouldn't rely on sleep / timing for this -- it slows the test and is 
brittle (it can break if the machine running the test is slow for some time, 
for example). Instead we should use synchronization mechanisms. The way that 
jumps out to me would be:
   1. Create a semaphore with no permits
   2. intercept the `getJournaledEdits` calls to JN0 and JN2 and have them 
`release()` the semaphore before calling `invocation.callRealMethod()`
   3. call `semaphore.acquire(2)` in the mock answer for JN1. This forces it 
not to respond until the permits have been released from the calls to JN0 and 
JN2



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java:
##########
@@ -1101,6 +1113,59 @@ public void testSelectViaRpcTwoJNsError() throws 
Exception {
     }
   }
 
+  /**
+   * Test selecting EditLogInputStream after some journalNode jitter.
+   * And the corner case as below:
+   * 1. Journal 0 has some abnormal cases when journaling Edits with start 
txId 11.
+   * 2. NameNode just ignore the abnormal journal 0 and continue to write 
Edits to Journal 1 and 2.
+   * 3. Journal 0 backed to health.
+   * 4. Observer NameNode try to select EditLogInputStream vis PRC with start 
txId 21.
+   * 5. Journal 1 has some abnormal cases caused slow response.
+   *
+   * And the expected selecting result is: Response should contain 20 Edits 
from txId 21 to txId 40.
+   * Because there is no Edits from id 21 to 40 in the cache of JournalNode0.
+   */
+  @Test
+  public void testSelectViaRpcAfterJNJitter() throws Exception {
+    EditLogOutputStream stm = qjm.startLogSegment(
+        1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    SettableFuture<Void> slowLog = SettableFuture.create();
+    Mockito.doReturn(slowLog).when(spies.get(0))
+        .sendEdits(eq(1L), eq(11L), eq(1), Mockito.any());
+    writeTxns(stm, 1, 10);
+    writeTxns(stm, 11, 10);
+    writeTxns(stm, 21, 10);
+    writeTxns(stm, 31, 10);
+    ListeningExecutorService service = MoreExecutors.listeningDecorator(
+        Executors.newSingleThreadExecutor());
+    Mockito.doAnswer(invocation -> service.submit(
+        () -> {
+          ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+          EditLogFileOutputStream.writeHeader(
+              NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION,
+              new DataOutputStream(byteStream));
+          byteStream.write(createTxnData(21, 20));
+          Thread.sleep(3000);
+          return GetJournaledEditsResponseProto.newBuilder()
+              .setTxnCount(20)
+              .setEditLog(ByteString.copyFrom(byteStream.toByteArray()))
+              .build();
+        })
+    ).when(spies.get(1)).getJournaledEdits(21,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);

Review Comment:
   Why don't we just use `invocation.callRealMethod()` instead of constructing 
the response here?
   
   ```suggestion
       Mockito.doAnswer(invocation -> service.submit(() -> {
         Thread.sleep(3000);
         try {
           return invocation.callRealMethod();
         } catch (Throwable e) {
           throw (Exception) e;
         }
       })).when(spies.get(1)).getJournaledEdits(21,
           QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
   ```



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java:
##########
@@ -1101,6 +1113,59 @@ public void testSelectViaRpcTwoJNsError() throws 
Exception {
     }
   }
 
+  /**
+   * Test selecting EditLogInputStream after some journalNode jitter.
+   * And the corner case as below:
+   * 1. Journal 0 has some abnormal cases when journaling Edits with start 
txId 11.
+   * 2. NameNode just ignore the abnormal journal 0 and continue to write 
Edits to Journal 1 and 2.
+   * 3. Journal 0 backed to health.
+   * 4. Observer NameNode try to select EditLogInputStream vis PRC with start 
txId 21.
+   * 5. Journal 1 has some abnormal cases caused slow response.
+   *
+   * And the expected selecting result is: Response should contain 20 Edits 
from txId 21 to txId 40.
+   * Because there is no Edits from id 21 to 40 in the cache of JournalNode0.
+   */
+  @Test
+  public void testSelectViaRpcAfterJNJitter() throws Exception {
+    EditLogOutputStream stm = qjm.startLogSegment(
+        1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    SettableFuture<Void> slowLog = SettableFuture.create();
+    Mockito.doReturn(slowLog).when(spies.get(0))
+        .sendEdits(eq(1L), eq(11L), eq(1), Mockito.any());
+    writeTxns(stm, 1, 10);
+    writeTxns(stm, 11, 10);
+    writeTxns(stm, 21, 10);
+    writeTxns(stm, 31, 10);
+    ListeningExecutorService service = MoreExecutors.listeningDecorator(
+        Executors.newSingleThreadExecutor());
+    Mockito.doAnswer(invocation -> service.submit(
+        () -> {
+          ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+          EditLogFileOutputStream.writeHeader(
+              NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION,
+              new DataOutputStream(byteStream));
+          byteStream.write(createTxnData(21, 20));
+          Thread.sleep(3000);
+          return GetJournaledEditsResponseProto.newBuilder()
+              .setTxnCount(20)
+              .setEditLog(ByteString.copyFrom(byteStream.toByteArray()))
+              .build();
+        })
+    ).when(spies.get(1)).getJournaledEdits(21,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    GetJournaledEditsResponseProto responseProto = spies.get(2)
+        .getJournaledEdits(21, 5000).get();
+    assertEquals(20, responseProto.getTxnCount());

Review Comment:
   why do we need this part?



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java:
##########
@@ -1101,6 +1113,59 @@ public void testSelectViaRpcTwoJNsError() throws 
Exception {
     }
   }
 
+  /**
+   * Test selecting EditLogInputStream after some journalNode jitter.
+   * And the corner case as below:
+   * 1. Journal 0 has some abnormal cases when journaling Edits with start 
txId 11.
+   * 2. NameNode just ignore the abnormal journal 0 and continue to write 
Edits to Journal 1 and 2.
+   * 3. Journal 0 backed to health.
+   * 4. Observer NameNode try to select EditLogInputStream vis PRC with start 
txId 21.
+   * 5. Journal 1 has some abnormal cases caused slow response.
+   *
+   * And the expected selecting result is: Response should contain 20 Edits 
from txId 21 to txId 40.
+   * Because there is no Edits from id 21 to 40 in the cache of JournalNode0.

Review Comment:
   1. line length is too long here
   2. typo: `vis PRC` -> `via RPC`
   3. Let's make the example a little more clear. We say the expected result 
should contain txn 21 - 40, but we never talk about writing up to txn 40. You 
have a good description in the PR



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java:
##########
@@ -1101,6 +1113,59 @@ public void testSelectViaRpcTwoJNsError() throws 
Exception {
     }
   }
 
+  /**
+   * Test selecting EditLogInputStream after some journalNode jitter.
+   * And the corner case as below:
+   * 1. Journal 0 has some abnormal cases when journaling Edits with start 
txId 11.
+   * 2. NameNode just ignore the abnormal journal 0 and continue to write 
Edits to Journal 1 and 2.
+   * 3. Journal 0 backed to health.
+   * 4. Observer NameNode try to select EditLogInputStream vis PRC with start 
txId 21.
+   * 5. Journal 1 has some abnormal cases caused slow response.
+   *
+   * And the expected selecting result is: Response should contain 20 Edits 
from txId 21 to txId 40.
+   * Because there is no Edits from id 21 to 40 in the cache of JournalNode0.
+   */
+  @Test
+  public void testSelectViaRpcAfterJNJitter() throws Exception {
+    EditLogOutputStream stm = qjm.startLogSegment(
+        1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    SettableFuture<Void> slowLog = SettableFuture.create();
+    Mockito.doReturn(slowLog).when(spies.get(0))
+        .sendEdits(eq(1L), eq(11L), eq(1), Mockito.any());
+    writeTxns(stm, 1, 10);
+    writeTxns(stm, 11, 10);
+    writeTxns(stm, 21, 10);
+    writeTxns(stm, 31, 10);

Review Comment:
   isn't just one `writeTxns` sufficient to trigger the issue? why do we need 4 
?



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java:
##########
@@ -750,10 +750,13 @@ public GetJournaledEditsResponseProto 
getJournaledEdits(long sinceTxId,
           "is a requirement to fetch journaled edits via RPC. Please enable " +
           "it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY);
     }
-    if (sinceTxId > getHighestWrittenTxId()) {
-      // Requested edits that don't exist yet; short-circuit the cache here
-      metrics.rpcEmptyResponses.incr();

Review Comment:
   we should keep the metrics here



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java:
##########
@@ -1101,6 +1113,59 @@ public void testSelectViaRpcTwoJNsError() throws 
Exception {
     }
   }
 
+  /**
+   * Test selecting EditLogInputStream after some journalNode jitter.
+   * And the corner case as below:
+   * 1. Journal 0 has some abnormal cases when journaling Edits with start 
txId 11.
+   * 2. NameNode just ignore the abnormal journal 0 and continue to write 
Edits to Journal 1 and 2.
+   * 3. Journal 0 backed to health.
+   * 4. Observer NameNode try to select EditLogInputStream vis PRC with start 
txId 21.
+   * 5. Journal 1 has some abnormal cases caused slow response.
+   *
+   * And the expected selecting result is: Response should contain 20 Edits 
from txId 21 to txId 40.
+   * Because there is no Edits from id 21 to 40 in the cache of JournalNode0.
+   */
+  @Test
+  public void testSelectViaRpcAfterJNJitter() throws Exception {

Review Comment:
   I think it's probably because you only make JN0 miss txns 1-10, but not 
11-40, so I think it would throw `CacheMissException` right now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to