This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 56b979bec6c2d7602982baad622f52860baa0cb9
Merge: a209139 9c50b1f
Author: David Capwell <[email protected]>
AuthorDate: Tue Jun 1 20:48:34 2021 -0700

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |  1 +
 .../apache/cassandra/metrics/ClientMetrics.java    | 16 ++++
 .../org/apache/cassandra/transport/Message.java    | 31 ++++++-
 .../cassandra/transport/ProtocolException.java     | 24 ++++++
 .../cassandra/transport/ProtocolVersion.java       | 17 ++--
 .../apache/cassandra/transport/SimpleClient.java   | 15 +++-
 .../org/apache/cassandra/utils/Throwables.java     | 11 +++
 .../test/UnableToParseClientMessageTest.java       | 97 ++++++++++++++++++++++
 .../cassandra/transport/MessagePayloadTest.java    | 11 +++
 .../cassandra/transport/ProtocolVersionTest.java   |  3 +-
 .../cassandra/transport/WrappedSimpleClient.java   | 72 ++++++++++++++++
 11 files changed, 285 insertions(+), 13 deletions(-)

diff --cc CHANGES.txt
index 474025d,923337e..2faa6d6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,14 -1,6 +1,15 @@@
 -3.0.25:
 +3.11.11
 + * Fix LeveledCompactionStrategy compacts last level throw an 
ArrayIndexOutOfBoundsException (CASSANDRA-15669)
 + * Maps $CASSANDRA_LOG_DIR to cassandra.logdir java property when executing 
nodetool (CASSANDRA-16199)
 + * Nodetool garbagecollect should retain SSTableLevel for LCS 
(CASSANDRA-16634)
 + * Ignore stale acks received in the shadow round (CASSANDRA-16588)
 + * Add autocomplete and error messages for provide_overlapping_tombstones 
(CASSANDRA-16350)
 + * Add StorageServiceMBean.getKeyspaceReplicationInfo(keyspaceName) 
(CASSANDRA-16447)
 + * Make sure sstables with moved starts are removed correctly in 
LeveledGenerations (CASSANDRA-16552)
 + * Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462)
 +Merged from 3.0:
+  * Failure to execute queries should emit a KPI other than read 
timeout/unavailable so it can be alerted/tracked (CASSANDRA-16581)
 - * Don't wait on schema versions from replacement target when replacing 
(CASSANDRA-16692)
 + * Don't wait on schema versions from replacement target when replacing a 
node (CASSANDRA-16692)
   * StandaloneVerifier does not fail when unable to verify SSTables, it only 
fails if Corruption is thrown (CASSANDRA-16683)
   * Fix bloom filter false ratio calculation by including true negatives 
(CASSANDRA-15834)
   * Prevent loss of commit log data when moving sstables between nodes 
(CASSANDRA-16619)
diff --cc src/java/org/apache/cassandra/transport/ProtocolException.java
index 6ef17ac,0b307a9..bdfdb6f
--- a/src/java/org/apache/cassandra/transport/ProtocolException.java
+++ b/src/java/org/apache/cassandra/transport/ProtocolException.java
@@@ -43,8 -43,37 +43,32 @@@ public class ProtocolException extends 
          return ExceptionCode.PROTOCOL_ERROR;
      }
  
 -    /**
 -     * If the ProtocolException is due to a connection being made with a 
protocol version that is lower
 -     * than Server.MIN_SUPPORTED_VERSION, this will return that unsupported 
protocol version.  Otherwise,
 -     * null is returned.
 -     */
 -    public Integer getAttemptedLowProtocolVersion()
 +    public ProtocolVersion getForcedProtocolVersion()
      {
 -        return attemptedLowProtocolVersion;
 +        return forcedProtocolVersion;
      }
+ 
+     public boolean isSilent()
+     {
+         return false;
+     }
+ 
+     public static ProtocolException toSilentException(ProtocolException e)
+     {
+         return new Silent(e);
+     }
+ 
+     private static class Silent extends ProtocolException
+     {
+         public Silent(ProtocolException cause)
+         {
 -            super(cause.getMessage(), cause.attemptedLowProtocolVersion);
++            super(cause.getMessage(), cause.forcedProtocolVersion);
+         }
+ 
+         @Override
+         public boolean isSilent()
+         {
+             return true;
+         }
+     }
  }
diff --cc src/java/org/apache/cassandra/transport/ProtocolVersion.java
index 05fafc2,0000000..4037817
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/transport/ProtocolVersion.java
+++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java
@@@ -1,153 -1,0 +1,160 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.transport;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.EnumSet;
 +import java.util.List;
 +import java.util.Optional;
 +
 +import org.apache.commons.lang3.ArrayUtils;
 +
 +/**
 + * The native (CQL binary) protocol version.
 + *
 + * Some versions may be in beta, which means that the client must
 + * specify the beta flag in the frame for the version to be considered valid.
 + * Beta versions must have the word "beta" in their description, this is 
mandated
 + * by the specs.
 + *
 + */
 +public enum ProtocolVersion implements Comparable<ProtocolVersion>
 +{
 +    // The order is important as it defines the chronological history of 
versions, which is used
 +    // to determine if a feature is supported or some serdes formats
 +    V1(1, "v1", false), // no longer supported
 +    V2(2, "v2", false), // no longer supported
 +    V3(3, "v3", false),
 +    V4(4, "v4", false),
 +    V5(5, "v5-beta", true);
 +
 +    /** The version number */
 +    private final int num;
 +
 +    /** A description of the version, beta versions should have the word 
"-beta" */
 +    private final String descr;
 +
 +    /** Set this to true for beta versions */
 +    private final boolean beta;
 +
 +    ProtocolVersion(int num, String descr, boolean beta)
 +    {
 +        this.num = num;
 +        this.descr = descr;
 +        this.beta = beta;
 +    }
 +
 +    /** The supported versions stored as an array, these should be private 
and are required for fast decoding*/
 +    private final static ProtocolVersion[] SUPPORTED_VERSIONS = new 
ProtocolVersion[] { V3, V4, V5 };
 +    final static ProtocolVersion MIN_SUPPORTED_VERSION = 
SUPPORTED_VERSIONS[0];
 +    final static ProtocolVersion MAX_SUPPORTED_VERSION = 
SUPPORTED_VERSIONS[SUPPORTED_VERSIONS.length - 1];
++    /** These versions are sent by some clients, but are not valid Apache 
Cassandra versions (66, and 65 are DSE versions) */
++    private static int[] KNOWN_INVALID_VERSIONS = { 66, 65};
 +
 +    /** All supported versions, published as an enumset */
 +    public final static EnumSet<ProtocolVersion> SUPPORTED = 
EnumSet.copyOf(Arrays.asList((ProtocolVersion[]) 
ArrayUtils.addAll(SUPPORTED_VERSIONS)));
 +
 +    /** Old unsupported versions, this is OK as long as we never add newer 
unsupported versions */
 +    public final static EnumSet<ProtocolVersion> UNSUPPORTED = 
EnumSet.complementOf(SUPPORTED);
 +
 +    /** The preferred versions */
 +    public final static ProtocolVersion CURRENT = V4;
 +    public final static Optional<ProtocolVersion> BETA = Optional.of(V5);
 +
 +    public static List<String> supportedVersions()
 +    {
 +        List<String> ret = new ArrayList<>(SUPPORTED.size());
 +        for (ProtocolVersion version : SUPPORTED)
 +            ret.add(version.toString());
 +        return ret;
 +    }
 +
 +    public static ProtocolVersion decode(int versionNum, ProtocolVersionLimit 
ceiling)
 +    {
 +        ProtocolVersion ret = versionNum >= MIN_SUPPORTED_VERSION.num && 
versionNum <= ceiling.getMaxVersion().num
 +                              ? SUPPORTED_VERSIONS[versionNum - 
MIN_SUPPORTED_VERSION.num]
 +                              : null;
 +
 +        if (ret == null)
 +        {
 +            // if this is not a supported version check the old versions
-             for (ProtocolVersion version : UNSUPPORTED)
++            for (ProtocolVersion dseVersion : UNSUPPORTED)
 +            {
 +                // if it is an old version that is no longer supported this 
ensures that we reply
 +                // with that same version
-                 if (version.num == versionNum)
-                     throw new 
ProtocolException(ProtocolVersion.invalidVersionMessage(versionNum), version);
++                if (dseVersion.num == versionNum)
++                    throw new 
ProtocolException(ProtocolVersion.invalidVersionMessage(versionNum), 
dseVersion);
++            }
++            for (int version : KNOWN_INVALID_VERSIONS)
++            {
++                if (versionNum == version)
++                    throw ProtocolException.toSilentException(new 
ProtocolException(ProtocolVersion.invalidVersionMessage(versionNum)));
 +            }
 +
-             // If the version is invalid reply with the highest version that 
we support
-             throw new ProtocolException(invalidVersionMessage(versionNum), 
ceiling.getMaxVersion());
++            // If the version is invalid reply with the channel's version
++            throw new ProtocolException(invalidVersionMessage(versionNum));
 +        }
 +
 +        return ret;
 +    }
 +
 +    public boolean isBeta()
 +    {
 +        return beta;
 +    }
 +
 +    public static String invalidVersionMessage(int version)
 +    {
 +        return String.format("Invalid or unsupported protocol version (%d); 
supported versions are (%s)",
 +                             version, String.join(", ", 
ProtocolVersion.supportedVersions()));
 +    }
 +
 +    public int asInt()
 +    {
 +        return num;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        // This format is mandated by the protocl specs for the SUPPORTED 
message, see OptionsMessage execute().
 +        return String.format("%d/%s", num, descr);
 +    }
 +
 +    public final boolean isGreaterThan(ProtocolVersion other)
 +    {
 +        return num > other.num;
 +    }
 +
 +    public final boolean isGreaterOrEqualTo(ProtocolVersion other)
 +    {
 +        return num >= other.num;
 +    }
 +
 +    public final boolean isSmallerThan(ProtocolVersion other)
 +    {
 +        return num < other.num;
 +    }
 +
 +    public final boolean isSmallerOrEqualTo(ProtocolVersion other)
 +    {
 +        return num <= other.num;
 +    }
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java
index 0000000,75cc905..7bfe5a0
mode 000000,100644..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java
@@@ -1,0 -1,88 +1,97 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.io.IOException;
+ import java.nio.charset.StandardCharsets;
+ import java.util.List;
+ 
+ import org.junit.Assert;
++import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import io.netty.buffer.Unpooled;
+ import org.apache.cassandra.Util;
++import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.Feature;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
+ import org.apache.cassandra.metrics.CassandraMetricsRegistry;
+ import org.apache.cassandra.transport.WrappedSimpleClient;
+ 
+ /**
+  * If a client sends a message that can not be parsed by the server then we 
need to detect this and update metrics
+  * for monitoring.
+  *
+  * An issue was found between 2.1 to 3.0 upgrades with regards to paging 
serialization. Since
+  * this is a serialization issue we hit similar paths by sending bad bytes to 
the server, so can simulate the mixed-mode
+  * paging issue without needing to send proper messages.
+  */
+ public class UnableToParseClientMessageTest extends TestBaseImpl
+ {
++    @BeforeClass
++    public static void setup()
++    {
++        DatabaseDescriptor.daemonInitialization();
++    }
++
+     @Test
+     public void badMessageCausesProtocolException() throws IOException, 
InterruptedException
+     {
+         try (Cluster cluster = init(Cluster.build(1).withConfig(c -> 
c.with(Feature.values())).start()))
+         {
+             // write gibberish to the native protocol
+             IInvokableInstance node = cluster.get(1);
+             // make sure everything is fine at the start
+             node.runOnInstance(() -> {
+                 Assert.assertEquals(0, 
CassandraMetricsRegistry.Metrics.getMeters()
+                                                                        
.get("org.apache.cassandra.metrics.Client.ProtocolException")
+                                                                        
.getCount());
+                 Assert.assertEquals(0, 
CassandraMetricsRegistry.Metrics.getMeters()
+                                                                        
.get("org.apache.cassandra.metrics.Client.UnknownException")
+                                                                        
.getCount());
+             });
+ 
+             try (WrappedSimpleClient client = new 
WrappedSimpleClient("127.0.0.1", 9042))
+             {
+                 client.connect(false, true);
+ 
+                 // this should return a failed response
+                 String response = client.write(Unpooled.wrappedBuffer("This 
is just a test".getBytes(StandardCharsets.UTF_8)), false).toString();
+                 Assert.assertTrue("Resposne '" + response + "' expected to 
contain 'Invalid or unsupported protocol version (84); the lowest supported 
version is 3 and the greatest is 4'",
 -                                  response.contains("Invalid or unsupported 
protocol version (84); the lowest supported version is 3 and the greatest is 
4"));
++                                  response.contains("Invalid or unsupported 
protocol version (84)"));
+ 
+                 node.runOnInstance(() -> {
+                     Util.spinAssertEquals(1L,
+                                           () -> 
CassandraMetricsRegistry.Metrics.getMeters()
+                                                                               
  .get("org.apache.cassandra.metrics.Client.ProtocolException")
+                                                                               
  .getCount(),
+                                           10);
++
+                     Assert.assertEquals(0, 
CassandraMetricsRegistry.Metrics.getMeters()
+                                                                            
.get("org.apache.cassandra.metrics.Client.UnknownException")
+                                                                            
.getCount());
+                 });
+                 List<String> results = node.logs().grep("Protocol exception 
with client networking").getResult();
+                 results.forEach(s -> Assert.assertTrue("Expected logs '" + s 
+ "' to contain: Invalid or unsupported protocol version (84)",
+                                                        s.contains("Invalid or 
unsupported protocol version (84)")));
+                 Assert.assertEquals(1, results.size()); // this logs less 
offtan than metrics as the log has a nospamlogger wrapper
+             }
+         }
+     }
+ }
diff --cc test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java
index 7b56a49,0000000..234435e
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java
+++ b/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java
@@@ -1,97 -1,0 +1,96 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.transport;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +public class ProtocolVersionTest
 +{
 +    @Test
 +    public void testDecode()
 +    {
 +        for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
 +            Assert.assertEquals(version, 
ProtocolVersion.decode(version.asInt(), ProtocolVersionLimit.SERVER_DEFAULT));
 +
 +        for (ProtocolVersion version : ProtocolVersion.UNSUPPORTED)
 +        { // unsupported old versions
 +            try
 +            {
 +                Assert.assertEquals(version, 
ProtocolVersion.decode(version.asInt(), ProtocolVersionLimit.SERVER_DEFAULT));
 +                Assert.fail("Expected invalid protocol exception");
 +            }
 +            catch (ProtocolException ex)
 +            {
 +                Assert.assertNotNull(ex.getForcedProtocolVersion());
 +                Assert.assertEquals(version, ex.getForcedProtocolVersion());
 +            }
 +        }
 +
 +        try
 +        { // unsupported newer version
 +            Assert.assertEquals(null, ProtocolVersion.decode(63, 
ProtocolVersionLimit.SERVER_DEFAULT));
 +            Assert.fail("Expected invalid protocol exception");
 +        }
 +        catch (ProtocolException ex)
 +        {
-             Assert.assertNotNull(ex.getForcedProtocolVersion());
-             Assert.assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, 
ex.getForcedProtocolVersion());
++            Assert.assertNull(ex.getForcedProtocolVersion());
 +        }
 +    }
 +
 +    @Test
 +    public void testSupportedVersions()
 +    {
 +        Assert.assertTrue(ProtocolVersion.supportedVersions().size() >= 2); 
// at least one OS and one DSE
 +        Assert.assertNotNull(ProtocolVersion.CURRENT);
 +
 +        Assert.assertFalse(ProtocolVersion.V4.isBeta());
 +        Assert.assertTrue(ProtocolVersion.V5.isBeta());
 +    }
 +
 +    @Test
 +    public void testComparisons()
 +    {
 +        
Assert.assertTrue(ProtocolVersion.V1.isSmallerOrEqualTo(ProtocolVersion.V1));
 +        
Assert.assertTrue(ProtocolVersion.V2.isSmallerOrEqualTo(ProtocolVersion.V2));
 +        
Assert.assertTrue(ProtocolVersion.V3.isSmallerOrEqualTo(ProtocolVersion.V3));
 +        
Assert.assertTrue(ProtocolVersion.V4.isSmallerOrEqualTo(ProtocolVersion.V4));
 +
 +        
Assert.assertTrue(ProtocolVersion.V1.isGreaterOrEqualTo(ProtocolVersion.V1));
 +        
Assert.assertTrue(ProtocolVersion.V2.isGreaterOrEqualTo(ProtocolVersion.V2));
 +        
Assert.assertTrue(ProtocolVersion.V3.isGreaterOrEqualTo(ProtocolVersion.V3));
 +        
Assert.assertTrue(ProtocolVersion.V4.isGreaterOrEqualTo(ProtocolVersion.V4));
 +
 +        
Assert.assertTrue(ProtocolVersion.V1.isSmallerThan(ProtocolVersion.V2));
 +        
Assert.assertTrue(ProtocolVersion.V2.isSmallerThan(ProtocolVersion.V3));
 +        
Assert.assertTrue(ProtocolVersion.V3.isSmallerThan(ProtocolVersion.V4));
 +
 +        
Assert.assertFalse(ProtocolVersion.V1.isGreaterThan(ProtocolVersion.V2));
 +        
Assert.assertFalse(ProtocolVersion.V2.isGreaterThan(ProtocolVersion.V3));
 +        
Assert.assertFalse(ProtocolVersion.V3.isGreaterThan(ProtocolVersion.V4));
 +
 +        
Assert.assertTrue(ProtocolVersion.V4.isGreaterThan(ProtocolVersion.V3));
 +        
Assert.assertTrue(ProtocolVersion.V3.isGreaterThan(ProtocolVersion.V2));
 +        
Assert.assertTrue(ProtocolVersion.V2.isGreaterThan(ProtocolVersion.V1));
 +
 +        
Assert.assertFalse(ProtocolVersion.V4.isSmallerThan(ProtocolVersion.V3));
 +        
Assert.assertFalse(ProtocolVersion.V3.isSmallerThan(ProtocolVersion.V2));
 +        
Assert.assertFalse(ProtocolVersion.V2.isSmallerThan(ProtocolVersion.V1));
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/transport/WrappedSimpleClient.java
index 0000000,af7c1d7..151d284
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/transport/WrappedSimpleClient.java
+++ b/test/unit/org/apache/cassandra/transport/WrappedSimpleClient.java
@@@ -1,0 -1,67 +1,72 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.transport;
+ 
+ import io.netty.buffer.ByteBuf;
+ import org.apache.cassandra.config.EncryptionOptions;
+ import org.apache.cassandra.exceptions.ExceptionCode;
+ import org.apache.cassandra.transport.messages.ErrorMessage;
+ 
+ /**
+  * Enhances {@link SimpleClient} to add custom logic to send to the server.
+  */
+ public class WrappedSimpleClient extends SimpleClient
+ {
 -    public WrappedSimpleClient(String host, int port, int version, 
EncryptionOptions.ClientEncryptionOptions encryptionOptions)
++    public WrappedSimpleClient(String host, int port, ProtocolVersion 
version, EncryptionOptions.ClientEncryptionOptions encryptionOptions)
+     {
+         super(host, port, version, encryptionOptions);
+     }
+ 
+     public WrappedSimpleClient(String host, int port, 
EncryptionOptions.ClientEncryptionOptions encryptionOptions)
+     {
+         super(host, port, encryptionOptions);
+     }
+ 
 -    public WrappedSimpleClient(String host, int port, int version)
++    public WrappedSimpleClient(String host, int port, ProtocolVersion version)
+     {
+         super(host, port, version);
+     }
+ 
++    public WrappedSimpleClient(String host, int port, ProtocolVersion 
version, boolean useBeta, EncryptionOptions.ClientEncryptionOptions 
encryptionOptions)
++    {
++        super(host, port, version, useBeta, encryptionOptions);
++    }
++
+     public WrappedSimpleClient(String host, int port)
+     {
+         super(host, port);
+     }
+ 
+     public Message.Response write(ByteBuf buffer) throws InterruptedException
+     {
+         return write(buffer, true);
+     }
+ 
+     public Message.Response write(ByteBuf buffer, boolean 
awaitCloseOnProtocolError) throws InterruptedException
+     {
+         lastWriteFuture = channel.writeAndFlush(buffer);
+         Message.Response response = responseHandler.responses.take();
+         if (awaitCloseOnProtocolError
+             && response instanceof ErrorMessage && ((ErrorMessage) 
response).error.code() == ExceptionCode.PROTOCOL_ERROR)
+         {
+             // protocol errors shutdown the connection, wait for it to close
+             connection.channel().closeFuture().awaitUninterruptibly();
+         }
+         return response;
+     }
+ }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to