This is an automated email from the ASF dual-hosted git repository.

markt-asf pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/main by this push:
     new 6218ec82db Add replay protection to the EncrpytInterceptor
6218ec82db is described below

commit 6218ec82db2f243d4e0c9c0df1808604c764f4db
Author: Mark Thomas <[email protected]>
AuthorDate: Wed Jun 17 13:44:02 2026 +0100

    Add replay protection to the EncrpytInterceptor
---
 .../group/interceptors/EncryptInterceptor.java     | 133 +++++++++-
 .../interceptors/EncryptInterceptorMBean.java      |  14 +
 .../group/interceptors/LocalStrings.properties     |   1 +
 .../apache/catalina/tribes/util/CyclicTracker.java | 119 +++++++++
 .../group/interceptors/TestEncryptInterceptor.java | 282 +++++++++++++++++----
 .../catalina/tribes/util/TestCyclicTracker.java    | 129 ++++++++++
 webapps/docs/changelog.xml                         |   3 +
 webapps/docs/config/cluster-interceptor.xml        |  14 +-
 8 files changed, 625 insertions(+), 70 deletions(-)

diff --git 
a/java/org/apache/catalina/tribes/group/interceptors/EncryptInterceptor.java 
b/java/org/apache/catalina/tribes/group/interceptors/EncryptInterceptor.java
index 5cc9359a5d..8cee5b468c 100644
--- a/java/org/apache/catalina/tribes/group/interceptors/EncryptInterceptor.java
+++ b/java/org/apache/catalina/tribes/group/interceptors/EncryptInterceptor.java
@@ -23,7 +23,10 @@ import java.security.NoSuchProviderException;
 import java.security.SecureRandom;
 import java.security.spec.AlgorithmParameterSpec;
 import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.crypto.Cipher;
 import javax.crypto.NoSuchPaddingException;
@@ -39,6 +42,7 @@ import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.ChannelInterceptorBase;
 import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.util.CyclicTracker;
 import org.apache.catalina.tribes.util.StringManager;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
@@ -63,6 +67,7 @@ public class EncryptInterceptor extends 
ChannelInterceptorBase implements Encryp
     private String encryptionAlgorithm = DEFAULT_ENCRYPTION_ALGORITHM;
     private byte[] encryptionKeyBytes;
     private String encryptionKeyString;
+    private int replayWindowSize = 1024;
 
 
     private BaseEncryptionManager encryptionManager;
@@ -80,7 +85,7 @@ public class EncryptInterceptor extends 
ChannelInterceptorBase implements Encryp
         if (Channel.SND_TX_SEQ == (svc & Channel.SND_TX_SEQ)) {
             try {
                 encryptionManager = 
createEncryptionManager(getEncryptionAlgorithm(), getEncryptionKeyInternal(),
-                        getProviderName());
+                        getProviderName(), getReplayWindowSize());
             } catch (GeneralSecurityException gse) {
                 throw new 
ChannelException(sm.getString("encryptInterceptor.init.failed"), gse);
             }
@@ -114,9 +119,12 @@ public class EncryptInterceptor extends 
ChannelInterceptorBase implements Encryp
             throws ChannelException {
         try {
             byte[] data = msg.getMessage().getBytes();
+            byte[] message = new byte[data.length + 8];
+            
XByteBuffer.toBytes(encryptionManager.getAndIncrementMessageNumber(), message, 
0);
+            System.arraycopy(data, 0, message, 8, data.length);
 
             // See #encrypt(byte[]) for an explanation of the return value
-            byte[][] bytes = encryptionManager.encrypt(data);
+            byte[][] bytes = encryptionManager.encrypt(message);
 
             XByteBuffer xbb = msg.getMessage();
 
@@ -139,12 +147,19 @@ public class EncryptInterceptor extends 
ChannelInterceptorBase implements Encryp
             byte[] data = msg.getMessage().getBytes();
 
             data = encryptionManager.decrypt(data);
+            if (data.length < 8) {
+                throw new 
GeneralSecurityException(sm.getString("encryptInterceptor.decrypt.error.short-message"));
+            }
+            if 
(!encryptionManager.checkIncomingMessageNumber(msg.getAddress(), 
XByteBuffer.toLong(data, 0))) {
+                log.error(sm.getString("encryptInterceptor.decrypt.replay"));
+                return;
+            }
 
             XByteBuffer xbb = msg.getMessage();
 
             // Completely replace the message with the decrypted one
             xbb.clear();
-            xbb.append(data, 0, data.length);
+            xbb.append(data, 8, data.length - 8);
 
             super.messageReceived(msg);
         } catch (GeneralSecurityException gse) {
@@ -152,6 +167,14 @@ public class EncryptInterceptor extends 
ChannelInterceptorBase implements Encryp
         }
     }
 
+    @Override
+    public void memberDisappeared(Member member) {
+        if (encryptionManager != null) {
+            encryptionManager.memberDisappeared(member);
+        }
+        super.memberDisappeared(member);
+    }
+
     /**
      * Sets the encryption algorithm to be used for encrypting and decrypting 
channel messages. You must specify the
      * <code>algorithm/mode/padding</code>. Information on standard algorithm 
names may be found in the
@@ -274,6 +297,36 @@ public class EncryptInterceptor extends 
ChannelInterceptorBase implements Encryp
         return providerName;
     }
 
+    /**
+     * Returns the number of message sequence numbers remembered for replay 
detection.
+     *
+     * @return The replay window size
+     */
+    @Override
+    public int getReplayWindowSize() {
+        return replayWindowSize;
+    }
+
+    /**
+     * Sets the number of message sequence numbers remembered for replay 
detection.
+     *
+     * @param replayWindowSize The replay window size
+     */
+    @Override
+    public void setReplayWindowSize(int replayWindowSize) {
+        if (replayWindowSize < 1) {
+            throw new IllegalArgumentException("replayWindowSize must be 
greater than zero");
+        }
+        this.replayWindowSize = replayWindowSize;
+    }
+
+    Long getRemovedMemberHeadValue(Member member) {
+        if (encryptionManager == null) {
+            return null;
+        }
+        return encryptionManager.getRemovedMemberHeadValue(member);
+    }
+
     // Copied from org.apache.tomcat.util.buf.HexUtils
     // @formatter:off
     private static final int[] DEC = {
@@ -320,7 +373,8 @@ public class EncryptInterceptor extends 
ChannelInterceptorBase implements Encryp
     }
 
     private static BaseEncryptionManager createEncryptionManager(String 
algorithm, byte[] encryptionKey,
-            String providerName) throws NoSuchAlgorithmException, 
NoSuchPaddingException, NoSuchProviderException {
+            String providerName, int replayWindowSize)
+            throws NoSuchAlgorithmException, NoSuchPaddingException, 
NoSuchProviderException {
         if (null == encryptionKey) {
             throw new 
IllegalStateException(sm.getString("encryptInterceptor.key.required"));
         }
@@ -359,8 +413,7 @@ public class EncryptInterceptor extends 
ChannelInterceptorBase implements Encryp
          */
         if ("NONE".equals(algorithmMode) || "ECB".equals(algorithmMode) || 
"PCBC".equals(algorithmMode) ||
                 "CTS".equals(algorithmMode) || "KW".equals(algorithmMode) || 
"KWP".equals(algorithmMode) ||
-                "CTR".equals(algorithmMode) ||
-                ("CBC".equals(algorithmMode) && 
"NOPADDING".equals(algorithmPadding)) ||
+                "CTR".equals(algorithmMode) || ("CBC".equals(algorithmMode) && 
"NOPADDING".equals(algorithmPadding)) ||
                 ("CFB".equals(algorithmMode) && 
"NOPADDING".equals(algorithmPadding)) ||
                 ("GCM".equals(algorithmMode) && 
"PKCS5PADDING".equals(algorithmPadding)) ||
                 ("OFB".equals(algorithmMode) && 
"NOPADDING".equals(algorithmPadding))) {
@@ -375,17 +428,18 @@ public class EncryptInterceptor extends 
ChannelInterceptorBase implements Encryp
 
         } else if (algorithmMode.startsWith("CFB") || 
algorithmMode.startsWith("OFB")) {
             // Using a non-default block size. Not supported as insecure 
and/or inefficient.
-            throw new IllegalArgumentException(
-                    sm.getString("encryptInterceptor.algorithm.unsupported", 
algorithm));
+            throw new 
IllegalArgumentException(sm.getString("encryptInterceptor.algorithm.unsupported",
 algorithm));
 
         } else if ("GCM".equals(algorithmMode) && 
"NOPADDING".equals(algorithmPadding)) {
             // Needs a specialised encryption manager to handle the 
differences between GCM and other modes
-            return new GCMEncryptionManager(algorithm, new 
SecretKeySpec(encryptionKey, algorithmName), providerName);
+            return new GCMEncryptionManager(algorithm, new 
SecretKeySpec(encryptionKey, algorithmName), providerName,
+                    replayWindowSize);
         }
 
         // Use the default encryption manager
         try {
-            return new BaseEncryptionManager(algorithm, new 
SecretKeySpec(encryptionKey, algorithmName), providerName);
+            return new BaseEncryptionManager(algorithm, new 
SecretKeySpec(encryptionKey, algorithmName), providerName,
+                    replayWindowSize);
         } catch (NoSuchAlgorithmException | NoSuchPaddingException | 
NoSuchProviderException ex) {
             throw new 
IllegalArgumentException(sm.getString("encryptInterceptor.algorithm.unsupported",
 algorithm), ex);
         }
@@ -423,24 +477,77 @@ public class EncryptInterceptor extends 
ChannelInterceptorBase implements Encryp
          * SecureRandom is thread-safe, but sharing a single instance will 
likely be a bottleneck.
          */
         private final ConcurrentLinkedQueue<SecureRandom> randomPool;
+        private final AtomicLong messageNumberGenerator = new AtomicLong();
+        private final Map<Member,CyclicTracker> receivedMessageNumbersByMember 
= new ConcurrentHashMap<>();
+        private final Map<Member,Long> messageNumbersByRemovedMember = new 
ConcurrentHashMap<>();
+        private final CyclicTracker receivedMessageNumbersForUnknownSender;
+        private final int replayWindowSize;
 
-        BaseEncryptionManager(String algorithm, SecretKeySpec secretKey, 
String providerName)
+        BaseEncryptionManager(String algorithm, SecretKeySpec secretKey, 
String providerName, int replayWindowSize)
                 throws NoSuchAlgorithmException, NoSuchPaddingException, 
NoSuchProviderException {
             this.algorithm = algorithm;
             this.providerName = providerName;
             this.secretKey = secretKey;
+            this.replayWindowSize = replayWindowSize;
 
             cipherPool = new ConcurrentLinkedQueue<>();
             Cipher cipher = createCipher();
             blockSize = cipher.getBlockSize();
             cipherPool.offer(cipher);
             randomPool = new ConcurrentLinkedQueue<>();
+            receivedMessageNumbersForUnknownSender = new 
CyclicTracker(replayWindowSize);
         }
 
         public void shutdown() {
             // Individual Cipher and SecureRandom objects need no explicit 
tear down
             cipherPool.clear();
             randomPool.clear();
+            receivedMessageNumbersByMember.clear();
+            messageNumbersByRemovedMember.clear();
+        }
+
+        public long getAndIncrementMessageNumber() {
+            return messageNumberGenerator.getAndIncrement();
+        }
+
+        public boolean checkIncomingMessageNumber(Member sender, long 
messageNumber) {
+            if (sender == null) {
+                return 
receivedMessageNumbersForUnknownSender.track(messageNumber);
+            }
+            return receivedMessageNumbersByMember.computeIfAbsent(sender, 
this::createTrackerForMember)
+                    .track(messageNumber);
+        }
+
+        public void memberDisappeared(Member member) {
+            CyclicTracker tracker = 
receivedMessageNumbersByMember.remove(member);
+            if (tracker != null) {
+                /*
+                 * There is a security trade off here.
+                 *
+                 * Entries are only removed from this Map if the Member 
reappears. That means there is a potential DoS
+                 * risks due to the growth of this Map. That is considered 
unlikely as only Members with the encryption
+                 * key will be added to this Map and the size of the Map.Entry 
is minimal.
+                 *
+                 * If entries are removed from this Map based either on Map 
size or time, that exposes the risk of a
+                 * replay attack using any message the Member may have 
previously sent.
+                 *
+                 * The replay attack is viewed as the higher risk, hence there 
are no limits on the size of this Map.
+                 */
+                messageNumbersByRemovedMember.put(member, 
Long.valueOf(tracker.getHeadValue()));
+            }
+        }
+
+        public Long getRemovedMemberHeadValue(Member member) {
+            return messageNumbersByRemovedMember.get(member);
+        }
+
+        private CyclicTracker createTrackerForMember(Member member) {
+            CyclicTracker tracker = new CyclicTracker(replayWindowSize);
+            Long headValue = messageNumbersByRemovedMember.remove(member);
+            if (headValue != null) {
+                tracker.track(headValue.longValue());
+            }
+            return tracker;
         }
 
         private String getAlgorithm() {
@@ -611,9 +718,9 @@ public class EncryptInterceptor extends 
ChannelInterceptorBase implements Encryp
      * number of bits supported 128-bit provide the best security.
      */
     private static class GCMEncryptionManager extends BaseEncryptionManager {
-        GCMEncryptionManager(String algorithm, SecretKeySpec secretKey, String 
providerName)
+        GCMEncryptionManager(String algorithm, SecretKeySpec secretKey, String 
providerName, int replayWindowSize)
                 throws NoSuchAlgorithmException, NoSuchPaddingException, 
NoSuchProviderException {
-            super(algorithm, secretKey, providerName);
+            super(algorithm, secretKey, providerName, replayWindowSize);
         }
 
         @Override
diff --git 
a/java/org/apache/catalina/tribes/group/interceptors/EncryptInterceptorMBean.java
 
b/java/org/apache/catalina/tribes/group/interceptors/EncryptInterceptorMBean.java
index 7d10a1f4f1..e6fd2fb4ef 100644
--- 
a/java/org/apache/catalina/tribes/group/interceptors/EncryptInterceptorMBean.java
+++ 
b/java/org/apache/catalina/tribes/group/interceptors/EncryptInterceptorMBean.java
@@ -76,4 +76,18 @@ public interface EncryptInterceptorMBean {
      * @return the JCA provider name, or {@code null} for default
      */
     String getProviderName();
+
+    /**
+     * Returns the number of message sequence numbers remembered for replay 
detection.
+     *
+     * @return the replay window size
+     */
+    int getReplayWindowSize();
+
+    /**
+     * Sets the number of message sequence numbers remembered for replay 
detection.
+     *
+     * @param replayWindowSize the replay window size
+     */
+    void setReplayWindowSize(int replayWindowSize);
 }
diff --git 
a/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties 
b/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties
index a8e3c5d6e5..392edf7554 100644
--- a/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties
+++ b/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties
@@ -21,6 +21,7 @@ encryptInterceptor.algorithm.switch=The EncryptInterceptor is 
using the algorith
 encryptInterceptor.algorithm.unsupported=EncryptInterceptor does not support 
algorithm [{0}]
 encryptInterceptor.decrypt.error.short-message=Failed to decrypt message: 
premature end-of-message
 encryptInterceptor.decrypt.failed=Failed to decrypt message
+encryptInterceptor.decrypt.replay=Failed to decrypt message: replay attack 
detected
 encryptInterceptor.encrypt.failed=Failed to encrypt message
 encryptInterceptor.init.failed=Failed to initialize EncryptInterceptor
 encryptInterceptor.key.required=Encryption key is required
diff --git a/java/org/apache/catalina/tribes/util/CyclicTracker.java 
b/java/org/apache/catalina/tribes/util/CyclicTracker.java
new file mode 100644
index 0000000000..09336ada78
--- /dev/null
+++ b/java/org/apache/catalina/tribes/util/CyclicTracker.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.util;
+
+/**
+ * Tracks recently observed integers in a fixed-size cyclic window.
+ * <p>
+ * Values are ordered using the natural {@code long} sequence with overflow 
handling so {@code Long.MAX_VALUE} is
+ * followed by {@code Long.MIN_VALUE}.
+ */
+public class CyclicTracker {
+
+    private final boolean[] seen;
+
+    private boolean initialized = false;
+    private long headValue;
+    private int headIndex;
+
+
+    public CyclicTracker(int size) {
+        if (size < 1) {
+            throw new IllegalArgumentException("size must be greater than 
zero");
+        }
+        seen = new boolean[size];
+    }
+
+
+    /**
+     * Tracks the provided value.
+     *
+     * @param value The value to track
+     *
+     * @return {@code true} if the value had not previously been seen and is 
within the current tracking window,
+     *             otherwise {@code false}
+     */
+    public synchronized boolean track(long value) {
+        if (!initialized) {
+            initialized = true;
+            headValue = value;
+            headIndex = 0;
+            seen[0] = true;
+            return true;
+        }
+
+        long behind = headValue - value;
+        if (behind >= 0) {
+            if (behind >= seen.length) {
+                return false;
+            }
+            int index = toIndex(headIndex - (int) behind);
+            if (seen[index]) {
+                return false;
+            }
+            seen[index] = true;
+            return true;
+        }
+
+        advance(distance(headValue, value));
+
+        headValue = value;
+        seen[headIndex] = true;
+        return true;
+    }
+
+    /**
+     * Returns the current head value.
+     *
+     * @return The current head value
+     */
+    public synchronized long getHeadValue() {
+        if (!initialized) {
+            throw new IllegalStateException("No value has been tracked");
+        }
+        return headValue;
+    }
+
+
+    private void advance(long delta) {
+        if (Long.compareUnsigned(delta, seen.length) >= 0) {
+            java.util.Arrays.fill(seen, false);
+            headIndex = toIndex(headIndex + (int) (delta % seen.length));
+            return;
+        }
+
+        for (int i = 1; i <= (int) delta; i++) {
+            int index = toIndex(headIndex + i);
+            seen[index] = false;
+        }
+        headIndex = toIndex(headIndex + (int) delta);
+    }
+
+
+    private int toIndex(int value) {
+        int result = value % seen.length;
+        if (result < 0) {
+            result += seen.length;
+        }
+        return result;
+    }
+
+
+    private long distance(long from, long to) {
+        return to - from;
+    }
+}
diff --git 
a/test/org/apache/catalina/tribes/group/interceptors/TestEncryptInterceptor.java
 
b/test/org/apache/catalina/tribes/group/interceptors/TestEncryptInterceptor.java
index e30e84321f..2c03dc4c38 100644
--- 
a/test/org/apache/catalina/tribes/group/interceptors/TestEncryptInterceptor.java
+++ 
b/test/org/apache/catalina/tribes/group/interceptors/TestEncryptInterceptor.java
@@ -37,13 +37,13 @@ import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.io.ChannelData;
 import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
 
 /**
  * Tests the EncryptInterceptor.
- *
- * Many of the tests in this class use strings as input and output, even
- * though the interceptor actually operates on byte arrays. This is done
- * for readability for the tests and their outputs.
+ * <p>
+ * Many of the tests in this class use strings as input and output, even 
though the interceptor actually operates on
+ * byte arrays. This is done for readability for the tests and their outputs.
  */
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class TestEncryptInterceptor extends EncryptionInterceptorBaseTest {
@@ -55,9 +55,7 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
 
         String testInput = "The quick brown fox jumps over the lazy dog.";
 
-        Assert.assertEquals("Basic roundtrip failed",
-                     testInput,
-                     roundTrip(testInput, src, dest));
+        Assert.assertEquals("Basic roundtrip failed", testInput, 
roundTrip(testInput, src, dest));
     }
 
     @Test
@@ -67,25 +65,15 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
 
         String testInput = "The quick brown fox jumps over the lazy dog.";
 
-        Assert.assertEquals("Basic roundtrip failed",
-                     testInput,
-                     roundTrip(testInput, src, dest));
+        Assert.assertEquals("Basic roundtrip failed", testInput, 
roundTrip(testInput, src, dest));
 
-        Assert.assertEquals("Second roundtrip failed",
-                testInput,
-                roundTrip(testInput, src, dest));
+        Assert.assertEquals("Second roundtrip failed", testInput, 
roundTrip(testInput, src, dest));
 
-        Assert.assertEquals("Third roundtrip failed",
-                testInput,
-                roundTrip(testInput, src, dest));
+        Assert.assertEquals("Third roundtrip failed", testInput, 
roundTrip(testInput, src, dest));
 
-        Assert.assertEquals("Fourth roundtrip failed",
-                testInput,
-                roundTrip(testInput, src, dest));
+        Assert.assertEquals("Fourth roundtrip failed", testInput, 
roundTrip(testInput, src, dest));
 
-        Assert.assertEquals("Fifth roundtrip failed",
-                testInput,
-                roundTrip(testInput, src, dest));
+        Assert.assertEquals("Fifth roundtrip failed", testInput, 
roundTrip(testInput, src, dest));
     }
 
     @Test
@@ -95,9 +83,7 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
 
         String testInput = "x";
 
-        Assert.assertEquals("Tiny payload roundtrip failed",
-                     testInput,
-                     roundTrip(testInput, src, dest));
+        Assert.assertEquals("Tiny payload roundtrip failed", testInput, 
roundTrip(testInput, src, dest));
     }
 
     @Test
@@ -105,11 +91,9 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
         src.start(Channel.SND_TX_SEQ);
         dest.start(Channel.SND_TX_SEQ);
 
-        byte[] bytes = new byte[1024*1024];
+        byte[] bytes = new byte[1024 * 1024];
 
-        Assert.assertArrayEquals("Huge payload roundtrip failed",
-                          bytes,
-                          roundTrip(bytes, src, dest));
+        Assert.assertArrayEquals("Huge payload roundtrip failed", bytes, 
roundTrip(bytes, src, dest));
     }
 
     @Test
@@ -121,9 +105,7 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
 
         String testInput = "The quick brown fox jumps over the lazy dog.";
 
-        Assert.assertEquals("Failed to set custom provider name",
-                     testInput,
-                     roundTrip(testInput, src, dest));
+        Assert.assertEquals("Failed to set custom provider name", testInput, 
roundTrip(testInput, src, dest));
     }
 
     @Test
@@ -138,9 +120,7 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
 
         String testInput = "The quick brown fox jumps over the lazy dog.";
 
-        Assert.assertEquals("Failed to set custom provider name",
-                     testInput,
-                     roundTrip(testInput, src, dest));
+        Assert.assertEquals("Failed to set custom provider name", testInput, 
roundTrip(testInput, src, dest));
     }
 
     @Test
@@ -155,9 +135,7 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
 
         String testInput = "The quick brown fox jumps over the lazy dog.";
 
-        Assert.assertEquals("Failed to set custom provider name",
-                     testInput,
-                     roundTrip(testInput, src, dest));
+        Assert.assertEquals("Failed to set custom provider name", testInput, 
roundTrip(testInput, src, dest));
     }
 
     @Test
@@ -171,7 +149,7 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
         msg.setMessage(new XByteBuffer(testInput.getBytes("UTF-8"), false));
         src.sendMessage(null, msg, null);
 
-        byte[] bytes = ((ValueCaptureInterceptor)src.getNext()).getValue();
+        byte[] bytes = ((ValueCaptureInterceptor) src.getNext()).getValue();
 
         try (FileOutputStream out = new FileOutputStream(MESSAGE_FILE)) {
             out.write(bytes);
@@ -205,21 +183,213 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
         msg.setMessage(new XByteBuffer(testInput.getBytes("UTF-8"), false));
         src.sendMessage(null, msg, null);
 
-        byte[] cipherText1 = 
((ValueCaptureInterceptor)src.getNext()).getValue();
+        byte[] cipherText1 = ((ValueCaptureInterceptor) 
src.getNext()).getValue();
+
+        msg.setMessage(new XByteBuffer(testInput.getBytes("UTF-8"), false));
+        src.sendMessage(null, msg, null);
+
+        byte[] cipherText2 = ((ValueCaptureInterceptor) 
src.getNext()).getValue();
+
+        MatcherAssert.assertThat("Two identical cleartexts encrypt to the same 
ciphertext", cipherText1,
+                IsNot.not(IsEqual.equalTo(cipherText2)));
+    }
+
+    @Test
+    public void testRejectReplay() throws Exception {
+        src.setNext(new ValueCaptureInterceptor());
+        dest.setPrevious(new ValuesCaptureInterceptor());
+        src.start(Channel.SND_TX_SEQ);
+        dest.start(Channel.SND_TX_SEQ);
+        MemberImpl sender = createMember("127.0.0.1", 10001, 1);
+
+        String testInput = "The quick brown fox jumps over the lazy dog.";
 
+        ChannelData msg = new ChannelData(false);
         msg.setMessage(new XByteBuffer(testInput.getBytes("UTF-8"), false));
         src.sendMessage(null, msg, null);
 
-        byte[] cipherText2 = 
((ValueCaptureInterceptor)src.getNext()).getValue();
+        byte[] encrypted = ((ValueCaptureInterceptor) 
src.getNext()).getValue();
+
+        ChannelData incoming = new ChannelData(false);
+        XByteBuffer xbb = new XByteBuffer(encrypted.length, false);
+        xbb.append(encrypted, 0, encrypted.length);
+        incoming.setMessage(xbb);
+        incoming.setAddress(sender);
+        dest.messageReceived(incoming);
+
+        incoming = new ChannelData(false);
+        xbb = new XByteBuffer(encrypted.length, false);
+        xbb.append(encrypted, 0, encrypted.length);
+        incoming.setMessage(xbb);
+        incoming.setAddress(sender);
+        dest.messageReceived(incoming);
+
+        Collection<byte[]> messages = ((ValuesCaptureInterceptor) 
dest.getPrevious()).getValues();
+        Assert.assertEquals(1, messages.size());
+        Assert.assertArrayEquals(testInput.getBytes("UTF-8"), 
messages.iterator().next());
+    }
+
+    @Test
+    public void testReplayWindowRejectsOldMessage() throws Exception {
+        src.setNext(new ValueCaptureInterceptor());
+        dest.setPrevious(new ValuesCaptureInterceptor());
+        dest.setReplayWindowSize(2);
+        src.start(Channel.SND_TX_SEQ);
+        dest.start(Channel.SND_TX_SEQ);
+        MemberImpl sender = createMember("127.0.0.1", 10001, 1);
+
+        byte[][] encrypted = new byte[3][];
+        for (int i = 0; i < encrypted.length; i++) {
+            ChannelData msg = new ChannelData(false);
+            msg.setMessage(new XByteBuffer(Long.toString(i).getBytes("UTF-8"), 
false));
+            src.sendMessage(null, msg, null);
+            encrypted[i] = ((ValueCaptureInterceptor) 
src.getNext()).getValue();
+            ChannelData incoming = new ChannelData(false);
+            XByteBuffer xbb = new XByteBuffer(encrypted[i].length, false);
+            xbb.append(encrypted[i], 0, encrypted[i].length);
+            incoming.setMessage(xbb);
+            incoming.setAddress(sender);
+            dest.messageReceived(incoming);
+        }
+
+        ChannelData replay = new ChannelData(false);
+        XByteBuffer xbb = new XByteBuffer(encrypted[0].length, false);
+        xbb.append(encrypted[0], 0, encrypted[0].length);
+        replay.setMessage(xbb);
+        replay.setAddress(sender);
+        dest.messageReceived(replay);
+
+        Collection<byte[]> messages = ((ValuesCaptureInterceptor) 
dest.getPrevious()).getValues();
+        Assert.assertEquals(3, messages.size());
+    }
+
+    @Test
+    public void testAcceptSameMessageNumberFromDifferentMembers() throws 
Exception {
+        EncryptInterceptor src1 = new EncryptInterceptor();
+        src1.setEncryptionKey(encryptionKey128);
+        src1.setNext(new ValueCaptureInterceptor());
+        src1.start(Channel.SND_TX_SEQ);
+
+        EncryptInterceptor src2 = new EncryptInterceptor();
+        src2.setEncryptionKey(encryptionKey128);
+        src2.setNext(new ValueCaptureInterceptor());
+        src2.start(Channel.SND_TX_SEQ);
 
-        MatcherAssert.assertThat("Two identical cleartexts encrypt to the same 
ciphertext",
-                cipherText1, IsNot.not(IsEqual.equalTo(cipherText2)));
+        dest.setPrevious(new ValuesCaptureInterceptor());
+        dest.start(Channel.SND_TX_SEQ);
+
+        ChannelData msg = new ChannelData(false);
+        msg.setMessage(new XByteBuffer("msg-1".getBytes("UTF-8"), false));
+        src1.sendMessage(null, msg, null);
+        byte[] encrypted1 = ((ValueCaptureInterceptor) 
src1.getNext()).getValue();
+
+        msg = new ChannelData(false);
+        msg.setMessage(new XByteBuffer("msg-2".getBytes("UTF-8"), false));
+        src2.sendMessage(null, msg, null);
+        byte[] encrypted2 = ((ValueCaptureInterceptor) 
src2.getNext()).getValue();
+
+        ChannelData incoming = new ChannelData(false);
+        XByteBuffer xbb = new XByteBuffer(encrypted1.length, false);
+        xbb.append(encrypted1, 0, encrypted1.length);
+        incoming.setMessage(xbb);
+        incoming.setAddress(createMember("127.0.0.1", 10001, 1));
+        dest.messageReceived(incoming);
+
+        incoming = new ChannelData(false);
+        xbb = new XByteBuffer(encrypted2.length, false);
+        xbb.append(encrypted2, 0, encrypted2.length);
+        incoming.setMessage(xbb);
+        incoming.setAddress(createMember("127.0.0.1", 10002, 2));
+        dest.messageReceived(incoming);
+
+        Collection<byte[]> messages = ((ValuesCaptureInterceptor) 
dest.getPrevious()).getValues();
+        Assert.assertEquals(2, messages.size());
+    }
+
+    @Test
+    public void testMemberDisappearedStoresHeadValue() throws Exception {
+        src.setNext(new ValueCaptureInterceptor());
+        dest.setPrevious(new ValuesCaptureInterceptor());
+        dest.setReplayWindowSize(4);
+        src.start(Channel.SND_TX_SEQ);
+        dest.start(Channel.SND_TX_SEQ);
+        MemberImpl sender = createMember("127.0.0.1", 10001, 1);
+
+        for (int i = 0; i < 3; i++) {
+            ChannelData msg = new ChannelData(false);
+            msg.setMessage(new XByteBuffer(Long.toString(i).getBytes("UTF-8"), 
false));
+            src.sendMessage(null, msg, null);
+            byte[] encrypted = ((ValueCaptureInterceptor) 
src.getNext()).getValue();
+
+            ChannelData incoming = new ChannelData(false);
+            XByteBuffer xbb = new XByteBuffer(encrypted.length, false);
+            xbb.append(encrypted, 0, encrypted.length);
+            incoming.setMessage(xbb);
+            incoming.setAddress(sender);
+            dest.messageReceived(incoming);
+        }
+
+        dest.memberDisappeared(sender);
+
+        Assert.assertEquals(Long.valueOf(2), 
dest.getRemovedMemberHeadValue(sender));
+    }
+
+    @Test
+    public void testRemovedMemberHeadValueInitializesReplacementTracker() 
throws Exception {
+        src.setNext(new ValueCaptureInterceptor());
+        dest.setPrevious(new ValuesCaptureInterceptor());
+        dest.setReplayWindowSize(4);
+        src.start(Channel.SND_TX_SEQ);
+        dest.start(Channel.SND_TX_SEQ);
+        MemberImpl sender = createMember("127.0.0.1", 10001, 1);
+
+        byte[][] encrypted = new byte[4][];
+        for (int i = 0; i < 3; i++) {
+            ChannelData msg = new ChannelData(false);
+            msg.setMessage(new XByteBuffer(Long.toString(i).getBytes("UTF-8"), 
false));
+            src.sendMessage(null, msg, null);
+            encrypted[i] = ((ValueCaptureInterceptor) 
src.getNext()).getValue();
+
+            ChannelData incoming = new ChannelData(false);
+            XByteBuffer xbb = new XByteBuffer(encrypted[i].length, false);
+            xbb.append(encrypted[i], 0, encrypted[i].length);
+            incoming.setMessage(xbb);
+            incoming.setAddress(sender);
+            dest.messageReceived(incoming);
+        }
+
+        dest.memberDisappeared(sender);
+        Assert.assertEquals(Long.valueOf(2), 
dest.getRemovedMemberHeadValue(sender));
+
+        ChannelData msg = new ChannelData(false);
+        msg.setMessage(new XByteBuffer("3".getBytes("UTF-8"), false));
+        src.sendMessage(null, msg, null);
+        encrypted[3] = ((ValueCaptureInterceptor) src.getNext()).getValue();
+
+        ChannelData incoming = new ChannelData(false);
+        XByteBuffer xbb = new XByteBuffer(encrypted[3].length, false);
+        xbb.append(encrypted[3], 0, encrypted[3].length);
+        incoming.setMessage(xbb);
+        incoming.setAddress(sender);
+        dest.messageReceived(incoming);
+
+        Assert.assertNull(dest.getRemovedMemberHeadValue(sender));
+
+        incoming = new ChannelData(false);
+        xbb = new XByteBuffer(encrypted[2].length, false);
+        xbb.append(encrypted[2], 0, encrypted[2].length);
+        incoming.setMessage(xbb);
+        incoming.setAddress(sender);
+        dest.messageReceived(incoming);
+
+        Collection<byte[]> messages = ((ValuesCaptureInterceptor) 
dest.getPrevious()).getValues();
+        Assert.assertEquals(4, messages.size());
     }
 
     @Test
     public void testPickup() throws Exception {
         File file = new File(MESSAGE_FILE);
-        if(!file.exists()) {
+        if (!file.exists()) {
             System.err.println("File message.bin does not exist. Skipping 
test.");
             return;
         }
@@ -242,8 +412,7 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
     }
 
     /*
-     * This test isn't guaranteed to catch any multithreaded issues, but it
-     * gives a good exercise.
+     * This test isn't guaranteed to catch any multithreaded issues, but it 
gives a good exercise.
      */
     @Test
     public void testMultithreaded() throws Exception {
@@ -266,7 +435,7 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
                     xbb.append(bytes, 0, bytes.length);
                     msg.setMessage(xbb);
 
-                    for(int i=0; i<messagesPerThread; ++i) {
+                    for (int i = 0; i < messagesPerThread; ++i) {
                         src.sendMessage(null, msg, null);
                     }
                 } catch (ChannelException e) {
@@ -276,26 +445,25 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
         };
 
         Thread[] threads = new Thread[numThreads];
-        for(int i=0; i<numThreads; ++i) {
+        for (int i = 0; i < numThreads; ++i) {
             threads[i] = new Thread(job);
             threads[i].setName("Message-Thread-" + i);
         }
 
-        for(int i=0; i<numThreads; ++i) {
+        for (int i = 0; i < numThreads; ++i) {
             threads[i].start();
         }
 
-        for(int i=0; i<numThreads; ++i) {
+        for (int i = 0; i < numThreads; ++i) {
             threads[i].join();
         }
 
         // Check all received messages to make sure they are not corrupted
-        Collection<byte[]> messages = 
((ValuesCaptureInterceptor)dest.getPrevious()).getValues();
+        Collection<byte[]> messages = ((ValuesCaptureInterceptor) 
dest.getPrevious()).getValues();
 
-        Assert.assertEquals("Did not receive all expected messages",
-                numThreads * messagesPerThread, messages.size());
+        Assert.assertEquals("Did not receive all expected messages", 
numThreads * messagesPerThread, messages.size());
 
-        for(byte[] message : messages) {
+        for (byte[] message : messages) {
             Assert.assertArrayEquals("Message is corrupted", message, bytes);
         }
     }
@@ -316,4 +484,10 @@ public class TestEncryptInterceptor extends 
EncryptionInterceptorBaseTest {
             Assert.fail("EncryptionInterceptor should throw 
ChannelConfigException, not " + t.getClass().getName());
         }
     }
+
+    private MemberImpl createMember(String host, int port, int uniqueIdSeed) 
throws Exception {
+        MemberImpl member = new MemberImpl(host, port, 0);
+        member.setUniqueId(new byte[] { (byte) uniqueIdSeed, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0 });
+        return member;
+    }
 }
diff --git a/test/org/apache/catalina/tribes/util/TestCyclicTracker.java 
b/test/org/apache/catalina/tribes/util/TestCyclicTracker.java
new file mode 100644
index 0000000000..c7eeaab5d2
--- /dev/null
+++ b/test/org/apache/catalina/tribes/util/TestCyclicTracker.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCyclicTracker {
+
+    @Test
+    public void testRejectsDuplicateWithinWindow() {
+        CyclicTracker tracker = new CyclicTracker(4);
+
+        Assert.assertTrue(tracker.track(10));
+        Assert.assertTrue(tracker.track(11));
+        Assert.assertFalse(tracker.track(10));
+        Assert.assertFalse(tracker.track(11));
+    }
+
+
+    @Test
+    public void testAcceptsSkippedValueWithinWindow() {
+        CyclicTracker tracker = new CyclicTracker(4);
+
+        Assert.assertTrue(tracker.track(10));
+        Assert.assertTrue(tracker.track(12));
+        Assert.assertTrue(tracker.track(11));
+        Assert.assertFalse(tracker.track(11));
+    }
+
+
+    @Test
+    public void testRejectsValuesThatAreTooOld() {
+        CyclicTracker tracker = new CyclicTracker(4);
+
+        Assert.assertTrue(tracker.track(10));
+        Assert.assertTrue(tracker.track(11));
+        Assert.assertTrue(tracker.track(12));
+        Assert.assertTrue(tracker.track(13));
+        Assert.assertFalse(tracker.track(9));
+    }
+
+
+    @Test
+    public void testAdvancePastWindowClearsOlderEntries() {
+        CyclicTracker tracker = new CyclicTracker(4);
+
+        Assert.assertTrue(tracker.track(10));
+        Assert.assertTrue(tracker.track(20));
+        Assert.assertTrue(tracker.track(17));
+        Assert.assertFalse(tracker.track(16));
+        Assert.assertFalse(tracker.track(10));
+    }
+
+
+    @Test
+    public void testAcceptsValuesFarAhead() {
+        CyclicTracker tracker = new CyclicTracker(4);
+
+        Assert.assertTrue(tracker.track(10));
+        Assert.assertTrue(tracker.track(Long.MIN_VALUE));
+        Assert.assertFalse(tracker.track(Long.MIN_VALUE));
+    }
+
+
+    @Test
+    public void testHandlesLongOverflow01() {
+        CyclicTracker tracker = new CyclicTracker(4);
+
+        Assert.assertTrue(tracker.track(Long.MAX_VALUE - 1));
+        Assert.assertTrue(tracker.track(Long.MAX_VALUE));
+        Assert.assertTrue(tracker.track(Long.MIN_VALUE));
+        Assert.assertTrue(tracker.track(Long.MIN_VALUE + 1));
+
+        Assert.assertFalse(tracker.track(Long.MAX_VALUE));
+        Assert.assertFalse(tracker.track(Long.MAX_VALUE - 2));
+    }
+
+
+    @Test
+    public void testHandlesLongOverflow02() {
+        CyclicTracker tracker = new CyclicTracker(4);
+
+        Assert.assertTrue(tracker.track(Long.MAX_VALUE - 1));
+        Assert.assertTrue(tracker.track(Long.MIN_VALUE + 2));
+        Assert.assertTrue(tracker.track(Long.MAX_VALUE));
+        Assert.assertFalse(tracker.track(Long.MAX_VALUE - 2));
+    }
+
+
+    @Test
+    public void testGetHeadValue() {
+        CyclicTracker tracker = new CyclicTracker(4);
+
+        Assert.assertTrue(tracker.track(10));
+        Assert.assertEquals(10, tracker.getHeadValue());
+        Assert.assertTrue(tracker.track(12));
+        Assert.assertEquals(12, tracker.getHeadValue());
+    }
+
+
+    @Test(expected = IllegalStateException.class)
+    public void testGetHeadValueNotInitialized() {
+        CyclicTracker tracker = new CyclicTracker(4);
+
+        tracker.getHeadValue();
+    }
+
+
+    @SuppressWarnings("unused")
+    @Test(expected = IllegalArgumentException.class)
+    public void testRejectsZeroSize() {
+        new CyclicTracker(0);
+    }
+}
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 26d01987ed..10dd3166eb 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -602,6 +602,9 @@
         Fix concurrency issues generating MD5 digests in the
         <code>CloudMembershipProvider</code> implementations. (markt)
       </fix>
+      <add>
+        Add replay protection to the <code>EncryptInterceptor</code>. (markt)
+      </add>
     </changelog>
   </subsection>
   <subsection name="WebSocket">
diff --git a/webapps/docs/config/cluster-interceptor.xml 
b/webapps/docs/config/cluster-interceptor.xml
index d0f5f88537..33bd7dc9b5 100644
--- a/webapps/docs/config/cluster-interceptor.xml
+++ b/webapps/docs/config/cluster-interceptor.xml
@@ -231,11 +231,19 @@
        <p>The default algorithm is <code>AES/GCM/NoPadding</code>.</p>
      </attribute>
      <attribute name="encryptionKey" required="true">
-       The key to be used with the encryption algorithm.
+       <p>The key to be used with the encryption algorithm.</p>
 
-       The key should be specified as hex-encoded bytes of the appropriate
+       <p>The key should be specified as hex-encoded bytes of the appropriate
        length for the algorithm (e.g. 16 bytes / 32 characters / 128 bits for
-       AES-128, 32 bytes / 64 characters / 256 bits for AES-256, etc.).
+       AES-128, 32 bytes / 64 characters / 256 bits for AES-256, etc.).</p>
+     </attribute>
+     <attribute name="replayWindowSize" required="false">
+       <p>The number of messages per sender remembered for replay 
detection.</p>
+
+       <p>Any message outside of this window will be rejected. Any message
+       inside this window will only be accpeted once and any duplicates
+       rejected. If not specified, the default of <code>1024</code> will be
+       used.</p>
      </attribute>
    </attributes>
   </subsection>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to