This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6d58114 Fix Handling of user defined nar sources/sinks (#2502)
6d58114 is described below
commit 6d5811445c150b728a83194b2d3868333d1bfaa3
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Wed Sep 5 09:05:40 2018 -0700
Fix Handling of user defined nar sources/sinks (#2502)
* Fix handling of nar archives
* Adress review comments
* Added a integration test to test archive
* Fixed build
* Fix build
* Fix integratin test
---
.../functions/instance/JavaInstanceRunnable.java | 9 +--
.../integration/functions/PulsarFunctionsTest.java | 82 ++++++++++++++--------
...Tester.java => CassandraSinkArchiveTester.java} | 8 +--
.../tests/integration/io/CassandraSinkTester.java | 2 +-
.../tests/integration/io/JdbcSinkTester.java | 2 +-
.../tests/integration/io/KafkaSinkTester.java | 2 +-
.../pulsar/tests/integration/io/SinkTester.java | 24 ++++++-
7 files changed, 85 insertions(+), 44 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index cdcad60..344d9ee 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -27,6 +27,7 @@ import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
+import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -246,12 +247,12 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
private void loadJars() throws Exception {
- if (jarFile.endsWith(".nar")) {
- // The functions code is contained in a NAR archive
+ try {
+ // Let's first try to treat it as a nar archive
fnCache.registerFunctionInstanceWithArchive(instanceConfig.getFunctionId(),
instanceConfig.getInstanceId(),
jarFile);
- } else {
- log.info("Loading JAR files for function {} from archive {}",
instanceConfig, jarFile);
+ } catch (FileNotFoundException e) {
+ log.info("For Function {} Loading as NAR failed with {}; treating
it as Jar instead", instanceConfig, e);
// create the function class loader
fnCache.registerFunctionInstance(
instanceConfig.getFunctionId(),
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5cefc6a..7b3cd4f 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -42,13 +42,7 @@ import
org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
-import org.apache.pulsar.tests.integration.io.CassandraSinkTester;
-import org.apache.pulsar.tests.integration.io.JdbcSinkTester;
-import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
-import org.apache.pulsar.tests.integration.io.KafkaSinkTester;
-import org.apache.pulsar.tests.integration.io.KafkaSourceTester;
-import org.apache.pulsar.tests.integration.io.SinkTester;
-import org.apache.pulsar.tests.integration.io.SourceTester;
+import org.apache.pulsar.tests.integration.io.*;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.Assert;
@@ -66,20 +60,25 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
@Test
public void testKafkaSink() throws Exception {
- testSink(new KafkaSinkTester());
+ testSink(new KafkaSinkTester(), true);
}
@Test
public void testCassandraSink() throws Exception {
- testSink(new CassandraSinkTester());
+ testSink(new CassandraSinkTester(), true);
+ }
+
+ @Test
+ public void testCassandraArchiveSink() throws Exception {
+ testSink(new CassandraSinkArchiveTester(), false);
}
@Test
public void testJdbcSink() throws Exception {
- testSink(new JdbcSinkTester());
+ testSink(new JdbcSinkTester(), true);
}
- private void testSink(SinkTester tester) throws Exception {
+ private void testSink(SinkTester tester, boolean builtin) throws Exception
{
tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
final String tenant = TopicName.PUBLIC_TENANT;
@@ -87,7 +86,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
final String inputTopicName = "test-sink-connector-"
+ tester.getSinkType() + "-" + functionRuntimeType +
"-input-topic-" + randomName(8);
final String sinkName = "test-sink-connector-"
- + tester.getSinkType() + "-" + functionRuntimeType + "-name-" +
randomName(8);
+ + tester.getSinkType().name().toLowerCase() + "-" +
functionRuntimeType + "-name-" + randomName(8);
final int numMessages = 20;
// prepare the testing environment for sink
@@ -97,7 +96,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
submitSinkConnector(tester, tenant, namespace, sinkName,
inputTopicName);
// get sink info
- getSinkInfoSuccess(tester, tenant, namespace, sinkName);
+ getSinkInfoSuccess(tester, tenant, namespace, sinkName, builtin);
// get sink status
getSinkStatus(tenant, namespace, sinkName);
@@ -105,7 +104,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
// produce messages
Map<String, String> kvs;
if (tester instanceof JdbcSinkTester) {
- kvs = produceSchemaMessagesToInputTopic(inputTopicName,
numMessages, AvroSchema.of(Foo.class));
+ kvs = produceSchemaMessagesToInputTopic(inputTopicName,
numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
} else {
kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
}
@@ -132,16 +131,31 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
String namespace,
String sinkName,
String inputTopicName) throws Exception
{
- String[] commands = {
- PulsarCluster.ADMIN_SCRIPT,
- "sink", "create",
- "--tenant", tenant,
- "--namespace", namespace,
- "--name", sinkName,
- "--sink-type", tester.sinkType(),
- "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
- "--inputs", inputTopicName
- };
+ String[] commands;
+ if (tester.getSinkType() != SinkTester.SinkType.UNDEFINED) {
+ commands = new String[] {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sink", "create",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName,
+ "--sink-type", tester.sinkType().name().toLowerCase(),
+ "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+ "--inputs", inputTopicName
+ };
+ } else {
+ commands = new String[] {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sink", "create",
+ "--tenant", tenant,
+ "--namespace", namespace,
+ "--name", sinkName,
+ "--archive", tester.getSinkArchive(),
+ "--classname", tester.getSinkClassName(),
+ "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+ "--inputs", inputTopicName
+ };
+ }
log.info("Run command : {}", StringUtils.join(commands, ' '));
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
assertTrue(
@@ -152,7 +166,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
protected void getSinkInfoSuccess(SinkTester tester,
String tenant,
String namespace,
- String sinkName) throws Exception {
+ String sinkName,
+ boolean builtin) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"functions",
@@ -163,10 +178,17 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
};
ContainerExecResult result =
pulsarCluster.getAnyWorker().execCmd(commands);
log.info("Get sink info : {}", result.getStdout());
- assertTrue(
- result.getStdout().contains("\"builtin\": \"" +
tester.getSinkType() + "\""),
- result.getStdout()
- );
+ if (builtin) {
+ assertTrue(
+ result.getStdout().contains("\"builtin\": \"" +
tester.getSinkType().name().toLowerCase() + "\""),
+ result.getStdout()
+ );
+ } else {
+ assertTrue(
+ result.getStdout().contains("\"className\": \"" +
tester.getSinkClassName() + "\""),
+ result.getStdout()
+ );
+ }
}
protected void getSinkStatus(String tenant, String namespace, String
sinkName) throws Exception {
@@ -231,7 +253,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
- Foo obj = new Foo();
+ JdbcSinkTester.Foo obj = new JdbcSinkTester.Foo();
obj.setField1("field1_" + i);
obj.setField2("field2_" + i);
obj.setField3(i);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
similarity index 93%
copy from
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
copy to
tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
index e31de9f..86c7689 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
@@ -35,10 +35,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
- * A tester for testing cassandra sink.
+ * A tester for testing cassandra sink submitted as an archive.
*/
@Slf4j
-public class CassandraSinkTester extends SinkTester {
+public class CassandraSinkArchiveTester extends SinkTester {
private static final String NAME = "cassandra";
@@ -54,8 +54,8 @@ public class CassandraSinkTester extends SinkTester {
private Cluster cluster;
private Session session;
- public CassandraSinkTester() {
- super("cassandra");
+ public CassandraSinkArchiveTester() {
+
super("/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar",
"org.apache.pulsar.io.cassandra.CassandraStringSink");
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.keySpace = "keySpace_" + suffix;
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
index e31de9f..c9d3e5a 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -55,7 +55,7 @@ public class CassandraSinkTester extends SinkTester {
private Session session;
public CassandraSinkTester() {
- super("cassandra");
+ super(SinkType.CASSANDRA);
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.keySpace = "keySpace_" + suffix;
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index 6a102f1..e4aa401 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -63,7 +63,7 @@ public class JdbcSinkTester extends SinkTester {
private Connection connection;
public JdbcSinkTester() {
- super(NAME);
+ super(SinkType.JDBC);
// container default value is test
sinkConfig.put("userName", "test");
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
index 1cd58f2..ff79e1a 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -52,7 +52,7 @@ public class KafkaSinkTester extends SinkTester {
private KafkaConsumer<String, String> kafkaConsumer;
public KafkaSinkTester() {
- super(NAME);
+ super(SinkType.KAFKA);
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.kafkaTopicName = "kafka_sink_topic_" + suffix;
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index 3eee821..098b8bf 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -29,17 +29,35 @@ import org.testng.collections.Maps;
@Getter
public abstract class SinkTester {
- protected final String sinkType;
+ public enum SinkType {
+ UNDEFINED,
+ CASSANDRA,
+ KAFKA,
+ JDBC
+ }
+
+ protected final SinkType sinkType;
+ protected final String sinkArchive;
+ protected final String sinkClassName;
protected final Map<String, Object> sinkConfig;
- public SinkTester(String sinkType) {
+ public SinkTester(SinkType sinkType) {
this.sinkType = sinkType;
+ this.sinkArchive = null;
+ this.sinkClassName = null;
+ this.sinkConfig = Maps.newHashMap();
+ }
+
+ public SinkTester(String sinkArchive, String sinkClassName) {
+ this.sinkType = SinkType.UNDEFINED;
+ this.sinkArchive = sinkArchive;
+ this.sinkClassName = sinkClassName;
this.sinkConfig = Maps.newHashMap();
}
public abstract void findSinkServiceContainer(Map<String,
GenericContainer<?>> externalServices);
- public String sinkType() {
+ public SinkType sinkType() {
return sinkType;
}