This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 4c6c98b43b5 [fix][test] Enable Cassandra sink tests (#18430)
4c6c98b43b5 is described below
commit 4c6c98b43b520b41f555acf18f3dfe596dded97e
Author: tison <[email protected]>
AuthorDate: Mon Nov 14 17:37:40 2022 +0800
[fix][test] Enable Cassandra sink tests (#18430)
* [fix][test] Enable Cassandra sink tests
This is based on https://github.com/apache/pulsar/pull/4036.
Signed-off-by: tison <[email protected]>
Co-authored-by: Sanjeev Kulkarni <[email protected]>
* avoid class not found
Following
https://docs.datastax.com/en/developer/java-driver/3.5/manual/metrics/#metrics-4-compatibility.
Signed-off-by: tison <[email protected]>
Co-authored-by: Sanjeev Kulkarni <[email protected]>
---
.../pulsar/io/cassandra/CassandraAbstractSink.java | 2 +-
.../integration/io/sinks/CassandraSinkTester.java | 19 +++++++++++--------
.../integration/io/sinks/PulsarIOSinkRunner.java | 4 ++--
.../tests/integration/io/sinks/PulsarSinksTest.java | 4 ++--
4 files changed, 16 insertions(+), 13 deletions(-)
diff --git
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
index 7a872ff6504..11a6684b49d 100644
---
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
+++
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
@@ -99,7 +99,7 @@ public abstract class CassandraAbstractSink<K, V> implements
Sink<byte[]> {
b.withPort(Integer.parseInt(hostPort[1]));
}
}
- cluster = b.build();
+ cluster = b.withoutJMXReporting().build();
session = cluster.connect();
session.execute("USE " + cassandraSinkConfig.getKeyspace());
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
index d545ddfd08f..f062124018d 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
@@ -23,6 +23,7 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.tests.integration.containers.CassandraContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
@@ -52,6 +53,7 @@ public class CassandraSinkTester extends
SinkTester<CassandraContainer> {
private static final String ROOTS = "cassandra";
private static final String KEY = "key";
private static final String COLUMN = "col";
+ private static final String ARCHIVE =
"/pulsar/connectors/pulsar-io-cassandra-" + PulsarVersion.getVersion() + ".nar";
private final String keySpace;
private final String tableName;
@@ -60,7 +62,7 @@ public class CassandraSinkTester extends
SinkTester<CassandraContainer> {
private Session session;
private CassandraSinkTester() {
- super(NAME,
"/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar",
"org.apache.pulsar.io.cassandra.CassandraStringSink");
+ super(NAME, ARCHIVE,
"org.apache.pulsar.io.cassandra.CassandraStringSink");
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.keySpace = "keySpace_" + suffix;
@@ -96,24 +98,25 @@ public class CassandraSinkTester extends
SinkTester<CassandraContainer> {
public void prepareSink() {
// build the sink
cluster = Cluster.builder()
- .addContactPoint("localhost")
- .withPort(serviceContainer.getCassandraPort())
- .build();
+ .addContactPoint("localhost")
+ .withPort(serviceContainer.getCassandraPort())
+ .withoutJMXReporting()
+ .build();
// connect to the cluster
session = cluster.connect();
log.info("Connecting to cassandra cluster at localhost:{}",
serviceContainer.getCassandraPort());
String createKeySpace =
- "CREATE KEYSPACE " + keySpace
- + " WITH replication = {'class':'SimpleStrategy',
'replication_factor':1}; ";
+ "CREATE KEYSPACE " + keySpace
+ + " WITH replication = {'class':'SimpleStrategy',
'replication_factor':1}; ";
log.info(createKeySpace);
session.execute(createKeySpace);
session.execute("USE " + keySpace);
String createTable = "CREATE TABLE " + tableName
- + "(" + KEY + " text PRIMARY KEY, "
- + COLUMN + " text);";
+ + "(" + KEY + " text PRIMARY KEY, "
+ + COLUMN + " text);";
log.info(createTable);
session.execute(createTable);
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
index 5fa96d77565..341d723adcc 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarIOSinkRunner.java
@@ -55,7 +55,7 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
super(cluster, functionRuntimeType);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings({ "rawtypes" })
public <T extends GenericContainer> void runSinkTester(SinkTester<T>
tester, boolean builtin) throws Exception {
final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
@@ -201,7 +201,7 @@ public class PulsarIOSinkRunner extends PulsarIOTestRunner {
} else {
commands = new String[] {
PulsarCluster.ADMIN_SCRIPT,
- "sink", "create",
+ "sink", "update",
"--tenant", tenant,
"--namespace", namespace,
"--name", sinkName,
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
index bec9c99e8a8..4b4fa6449e9 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
@@ -40,12 +40,12 @@ public class PulsarSinksTest extends PulsarIOTestBase {
testSink(new KafkaSinkTester(kafkaContainerName), true, new
KafkaSourceTester(kafkaContainerName));
}
- @Test(enabled = false, groups = "sink")
+ @Test(groups = "sink")
public void testCassandraSink() throws Exception {
testSink(CassandraSinkTester.createTester(true), true);
}
- @Test(enabled = false, groups = "sink")
+ @Test(groups = "sink")
public void testCassandraArchiveSink() throws Exception {
testSink(CassandraSinkTester.createTester(false), false);
}