This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 4c6c98b43b5 [fix][test] Enable Cassandra sink tests (#18430)
4c6c98b43b5 is described below

commit 4c6c98b43b520b41f555acf18f3dfe596dded97e
Author: tison <[email protected]>
AuthorDate: Mon Nov 14 17:37:40 2022 +0800

    [fix][test] Enable Cassandra sink tests (#18430)
    
    * [fix][test] Enable Cassandra sink tests
    
    This is based on https://github.com/apache/pulsar/pull/4036.
    
    Signed-off-by: tison <[email protected]>
    Co-authored-by: Sanjeev Kulkarni <[email protected]>
    
    * avoid class not found
    
    Following
    
https://docs.datastax.com/en/developer/java-driver/3.5/manual/metrics/#metrics-4-compatibility.
    
    Signed-off-by: tison <[email protected]>
    Co-authored-by: Sanjeev Kulkarni <[email protected]>
---
 .../pulsar/io/cassandra/CassandraAbstractSink.java    |  2 +-
 .../integration/io/sinks/CassandraSinkTester.java     | 19 +++++++++++--------
 .../integration/io/sinks/PulsarIOSinkRunner.java      |  4 ++--
 .../tests/integration/io/sinks/PulsarSinksTest.java   |  4 ++--
 4 files changed, 16 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
index 7a872ff6504..11a6684b49d 100644
--- 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
+++ 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
@@ -99,7 +99,7 @@ public abstract class CassandraAbstractSink<K, V> implements 
Sink<byte[]> {
                 b.withPort(Integer.parseInt(hostPort[1]));
             }
         }
-        cluster = b.build();
+        cluster = b.withoutJMXReporting().build();
         session = cluster.connect();
         session.execute("USE " + cassandraSinkConfig.getKeyspace());
     }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
index d545ddfd08f..f062124018d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
@@ -23,6 +23,7 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.tests.integration.containers.CassandraContainer;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 
@@ -52,6 +53,7 @@ public class CassandraSinkTester extends 
SinkTester<CassandraContainer> {
     private static final String ROOTS = "cassandra";
     private static final String KEY = "key";
     private static final String COLUMN = "col";
+    private static final String ARCHIVE = 
"/pulsar/connectors/pulsar-io-cassandra-" + PulsarVersion.getVersion() + ".nar";
 
     private final String keySpace;
     private final String tableName;
@@ -60,7 +62,7 @@ public class CassandraSinkTester extends 
SinkTester<CassandraContainer> {
     private Session session;
 
     private CassandraSinkTester() {
-        super(NAME, 
"/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", 
"org.apache.pulsar.io.cassandra.CassandraStringSink");
+        super(NAME, ARCHIVE, 
"org.apache.pulsar.io.cassandra.CassandraStringSink");
 
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.keySpace = "keySpace_" + suffix;
@@ -96,24 +98,25 @@ public class CassandraSinkTester extends 
SinkTester<CassandraContainer> {
     public void prepareSink() {
         // build the sink
         cluster = Cluster.builder()
-            .addContactPoint("localhost")
-            .withPort(serviceContainer.getCassandraPort())
-            .build();
+                .addContactPoint("localhost")
+                .withPort(serviceContainer.getCassandraPort())
+                .withoutJMXReporting()
+                .build();
 
         // connect to the cluster
         session = cluster.connect();
         log.info("Connecting to cassandra cluster at localhost:{}", 
serviceContainer.getCassandraPort());
 
         String createKeySpace =
-            "CREATE KEYSPACE " + keySpace
-                + " WITH replication = {'class':'SimpleStrategy', 
'replication_factor':1}; ";
+                "CREATE KEYSPACE " + keySpace
+                        + " WITH replication = {'class':'SimpleStrategy', 
'replication_factor':1}; ";
         log.info(createKeySpace);
         session.execute(createKeySpace);
         session.execute("USE " + keySpace);
 
         String createTable = "CREATE TABLE " + tableName
-            + "(" + KEY + " text PRIMARY KEY, "
-            + COLUMN + " text);";
+                + "(" + KEY + " text PRIMARY KEY, "
+                + COLUMN + " text);";
         log.info(createTable);
         session.execute(createTable);
     }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
index 5fa96d77565..341d723adcc 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
@@ -55,7 +55,7 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
                super(cluster, functionRuntimeType);
        }
 
-       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @SuppressWarnings({ "rawtypes" })
        public <T extends GenericContainer> void runSinkTester(SinkTester<T> 
tester, boolean builtin) throws Exception {
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
@@ -201,7 +201,7 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
         } else {
             commands = new String[] {
                     PulsarCluster.ADMIN_SCRIPT,
-                    "sink", "create",
+                    "sink", "update",
                     "--tenant", tenant,
                     "--namespace", namespace,
                     "--name", sinkName,
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
index bec9c99e8a8..4b4fa6449e9 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
@@ -40,12 +40,12 @@ public class PulsarSinksTest extends PulsarIOTestBase {
         testSink(new KafkaSinkTester(kafkaContainerName), true, new 
KafkaSourceTester(kafkaContainerName));
     }
 
-    @Test(enabled = false, groups = "sink")
+    @Test(groups = "sink")
     public void testCassandraSink() throws Exception {
         testSink(CassandraSinkTester.createTester(true), true);
     }
 
-    @Test(enabled = false, groups = "sink")
+    @Test(groups = "sink")
     public void testCassandraArchiveSink() throws Exception {
         testSink(CassandraSinkTester.createTester(false), false);
     }

Reply via email to