This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
new 25fd7bd Add support in jvm dtest to test thrift
25fd7bd is described below
commit 25fd7bd84f1931d2a44e90e629f794c4cd11aa46
Author: David Capwell <[email protected]>
AuthorDate: Fri Jul 24 18:16:37 2020 -0700
Add support in jvm dtest to test thrift
patch by David Capwell; reviewed by Alex Petrov,Jon Meredith for
CASSANDRA-15967
---
build.xml | 1 +
.../apache/cassandra/service/CassandraDaemon.java | 24 +++++++---
.../distributed/test/ThriftClientTest.java | 51 ++++++++++++++++++++++
.../distributed/test/ThriftClientUtils.java | 38 ++++++++++++++++
4 files changed, 109 insertions(+), 5 deletions(-)
diff --git a/build.xml b/build.xml
index 4c05a94..84172aa 100644
--- a/build.xml
+++ b/build.xml
@@ -1866,6 +1866,7 @@
<jar jarfile="${build.dir}/dtest-${base.version}.jar">
<zipgroupfileset dir="${build.lib}" includes="*.jar"
excludes="META-INF/*.SF"/>
<fileset dir="${build.classes.main}"/>
+ <fileset dir="${build.classes.thrift}"/>
<fileset dir="${test.classes}"/>
<fileset dir="${test.conf}" />
</jar>
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index a317ab3..7d85d9d 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.rmi.AccessException;
import java.rmi.AlreadyBoundException;
@@ -372,11 +373,6 @@ public class CassandraDaemon
if (sizeRecorderInterval > 0)
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance,
30, sizeRecorderInterval, TimeUnit.SECONDS);
- // Thrift
- InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
- int rpcPort = DatabaseDescriptor.getRpcPort();
- int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
- thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
initializeNativeTransport();
completeSetup();
@@ -384,6 +380,12 @@ public class CassandraDaemon
public void initializeNativeTransport()
{
+ // Thrift
+ InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
+ int rpcPort = DatabaseDescriptor.getRpcPort();
+ int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
+ thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
+
// Native transport
InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
int nativePort = DatabaseDescriptor.getNativeTransportPort();
@@ -398,6 +400,12 @@ public class CassandraDaemon
throw new IllegalStateException("native transport should be set up
before it can be started");
nativeServer.start();
+ logger.info("Native server running on {}", new
InetSocketAddress(DatabaseDescriptor.getRpcAddress(),
DatabaseDescriptor.getNativeTransportPort()));
+
+ if (thriftServer == null)
+ throw new IllegalStateException("thrift transport should be set up
before it can be started");
+ thriftServer.start();
+ logger.info("Thrift server running on {}", new
InetSocketAddress(DatabaseDescriptor.getRpcAddress(),
DatabaseDescriptor.getRpcPort()));
}
private void validateTransportsCanStart()
@@ -567,6 +575,12 @@ public class CassandraDaemon
nativeServer.stopAndAwaitTermination();
nativeServer = null;
}
+
+ if (thriftServer != null)
+ {
+ thriftServer.stopAndAwaitTermination();
+ thriftServer = null;
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ThriftClientTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ThriftClientTest.java
new file mode 100644
index 0000000..c5d5b9b
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/ThriftClientTest.java
@@ -0,0 +1,51 @@
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.QueryResults;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.thrift.TException;
+
+public class ThriftClientTest extends TestBaseImpl
+{
+ @Test
+ public void writeThenReadCQL() throws IOException, TException
+ {
+ try (Cluster cluster = init(Cluster.build(1).withConfig(c ->
c.with(Feature.NATIVE_PROTOCOL)).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
value int, PRIMARY KEY (pk))");
+
+ ThriftClientUtils.thriftClient(cluster.get(1), thrift -> {
+ thrift.set_keyspace(KEYSPACE);
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn csoc = new ColumnOrSuperColumn();
+ Column column = new Column();
+
column.setName(CompositeType.build(ByteBufferUtil.bytes("value")));
+ column.setValue(ByteBufferUtil.bytes(0));
+ column.setTimestamp(System.currentTimeMillis());
+ csoc.setColumn(column);
+ mutation.setColumn_or_supercolumn(csoc);
+
+
thrift.batch_mutate(Collections.singletonMap(ByteBufferUtil.bytes(0),
+
Collections.singletonMap("tbl", Arrays.asList(mutation))),
+
org.apache.cassandra.thrift.ConsistencyLevel.ALL);
+ });
+
+ SimpleQueryResult qr =
cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl",
ConsistencyLevel.ALL);
+ AssertUtils.assertRows(qr, QueryResults.builder().row(0,
0).build());
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ThriftClientUtils.java
b/test/distributed/org/apache/cassandra/distributed/test/ThriftClientUtils.java
new file mode 100644
index 0000000..51b153c
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/ThriftClientUtils.java
@@ -0,0 +1,38 @@
+package org.apache.cassandra.distributed.test;
+
+import java.net.InetSocketAddress;
+
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.TFramedTransportFactory;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransport;
+
+public final class ThriftClientUtils
+{
+ private ThriftClientUtils()
+ {
+
+ }
+
+ public static void thriftClient(IInstance instance, ThriftConsumer fn)
throws TException
+ {
+ //TODO dtest APIs only expose native address, doesn't expose all
addresses we listen to, so assume the default thrift port
+ thriftClient(new
InetSocketAddress(instance.broadcastAddress().getAddress(), 9160), fn);
+ }
+
+ public static void thriftClient(InetSocketAddress address, ThriftConsumer
fn) throws TException
+ {
+ try (TTransport transport = new
TFramedTransportFactory().openTransport(address.getAddress().getHostAddress(),
address.getPort()))
+ {
+ Cassandra.Client client = new Cassandra.Client(new
TBinaryProtocol(transport));
+ fn.accept(client);
+ }
+ }
+
+ public interface ThriftConsumer
+ {
+ void accept(Cassandra.Client client) throws TException;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]