This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
     new 25fd7bd  Add support in jvm dtest to test thrift
25fd7bd is described below

commit 25fd7bd84f1931d2a44e90e629f794c4cd11aa46
Author: David Capwell <[email protected]>
AuthorDate: Fri Jul 24 18:16:37 2020 -0700

    Add support in jvm dtest to test thrift
    
    patch by David Capwell; reviewed by Alex Petrov,Jon Meredith for 
CASSANDRA-15967
---
 build.xml                                          |  1 +
 .../apache/cassandra/service/CassandraDaemon.java  | 24 +++++++---
 .../distributed/test/ThriftClientTest.java         | 51 ++++++++++++++++++++++
 .../distributed/test/ThriftClientUtils.java        | 38 ++++++++++++++++
 4 files changed, 109 insertions(+), 5 deletions(-)

diff --git a/build.xml b/build.xml
index 4c05a94..84172aa 100644
--- a/build.xml
+++ b/build.xml
@@ -1866,6 +1866,7 @@
       <jar jarfile="${build.dir}/dtest-${base.version}.jar">
           <zipgroupfileset dir="${build.lib}" includes="*.jar" 
excludes="META-INF/*.SF"/>
           <fileset dir="${build.classes.main}"/>
+          <fileset dir="${build.classes.thrift}"/>
           <fileset dir="${test.classes}"/>
           <fileset dir="${test.conf}" />
       </jar>
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index a317ab3..7d85d9d 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryPoolMXBean;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.rmi.AccessException;
 import java.rmi.AlreadyBoundException;
@@ -372,11 +373,6 @@ public class CassandraDaemon
         if (sizeRecorderInterval > 0)
             
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance,
 30, sizeRecorderInterval, TimeUnit.SECONDS);
 
-        // Thrift
-        InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
-        int rpcPort = DatabaseDescriptor.getRpcPort();
-        int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
-        thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
         initializeNativeTransport();
 
         completeSetup();
@@ -384,6 +380,12 @@ public class CassandraDaemon
 
     public void initializeNativeTransport()
     {
+        // Thrift
+        InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
+        int rpcPort = DatabaseDescriptor.getRpcPort();
+        int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
+        thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
+
         // Native transport
         InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
         int nativePort = DatabaseDescriptor.getNativeTransportPort();
@@ -398,6 +400,12 @@ public class CassandraDaemon
             throw new IllegalStateException("native transport should be set up 
before it can be started");
 
         nativeServer.start();
+        logger.info("Native server running on {}", new 
InetSocketAddress(DatabaseDescriptor.getRpcAddress(), 
DatabaseDescriptor.getNativeTransportPort()));
+
+        if (thriftServer == null)
+            throw new IllegalStateException("thrift transport should be set up 
before it can be started");
+        thriftServer.start();
+        logger.info("Thrift server running on {}", new 
InetSocketAddress(DatabaseDescriptor.getRpcAddress(), 
DatabaseDescriptor.getRpcPort()));
     }
 
     private void validateTransportsCanStart()
@@ -567,6 +575,12 @@ public class CassandraDaemon
             nativeServer.stopAndAwaitTermination();
             nativeServer = null;
         }
+
+        if (thriftServer != null)
+        {
+            thriftServer.stopAndAwaitTermination();
+            thriftServer = null;
+        }
     }
 
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ThriftClientTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/ThriftClientTest.java
new file mode 100644
index 0000000..c5d5b9b
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ThriftClientTest.java
@@ -0,0 +1,51 @@
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.QueryResults;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.thrift.TException;
+
+public class ThriftClientTest extends TestBaseImpl
+{
+    @Test
+    public void writeThenReadCQL() throws IOException, TException
+    {
+        try (Cluster cluster = init(Cluster.build(1).withConfig(c -> 
c.with(Feature.NATIVE_PROTOCOL)).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
value int, PRIMARY KEY (pk))");
+
+            ThriftClientUtils.thriftClient(cluster.get(1), thrift -> {
+                thrift.set_keyspace(KEYSPACE);
+                Mutation mutation = new Mutation();
+                ColumnOrSuperColumn csoc = new ColumnOrSuperColumn();
+                Column column = new Column();
+                
column.setName(CompositeType.build(ByteBufferUtil.bytes("value")));
+                column.setValue(ByteBufferUtil.bytes(0));
+                column.setTimestamp(System.currentTimeMillis());
+                csoc.setColumn(column);
+                mutation.setColumn_or_supercolumn(csoc);
+
+                
thrift.batch_mutate(Collections.singletonMap(ByteBufferUtil.bytes(0),
+                                                             
Collections.singletonMap("tbl", Arrays.asList(mutation))),
+                                    
org.apache.cassandra.thrift.ConsistencyLevel.ALL);
+            });
+
+            SimpleQueryResult qr = 
cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", 
ConsistencyLevel.ALL);
+            AssertUtils.assertRows(qr, QueryResults.builder().row(0, 
0).build());
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ThriftClientUtils.java 
b/test/distributed/org/apache/cassandra/distributed/test/ThriftClientUtils.java
new file mode 100644
index 0000000..51b153c
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ThriftClientUtils.java
@@ -0,0 +1,38 @@
+package org.apache.cassandra.distributed.test;
+
+import java.net.InetSocketAddress;
+
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.TFramedTransportFactory;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransport;
+
+public final class ThriftClientUtils
+{
+    private ThriftClientUtils()
+    {
+
+    }
+
+    public static void thriftClient(IInstance instance, ThriftConsumer fn) 
throws TException
+    {
+        //TODO dtest APIs only expose native address, doesn't expose all 
addresses we listen to, so assume the default thrift port
+        thriftClient(new 
InetSocketAddress(instance.broadcastAddress().getAddress(), 9160), fn);
+    }
+
+    public static void thriftClient(InetSocketAddress address, ThriftConsumer 
fn) throws TException
+    {
+        try (TTransport transport = new 
TFramedTransportFactory().openTransport(address.getAddress().getHostAddress(), 
address.getPort()))
+        {
+            Cassandra.Client client = new Cassandra.Client(new 
TBinaryProtocol(transport));
+            fn.accept(client);
+        }
+    }
+
+    public interface ThriftConsumer
+    {
+        void accept(Cassandra.Client client) throws TException;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to