srkukarni closed pull request #2502: Fix Handling of user defined nar 
sources/sinks
URL: https://github.com/apache/incubator-pulsar/pull/2502
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index cdcad60eaa..344d9eefe9 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -27,6 +27,7 @@
 
 import io.netty.buffer.ByteBuf;
 
+import java.io.FileNotFoundException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -246,12 +247,12 @@ public void run() {
 
     private void loadJars() throws Exception {
 
-        if (jarFile.endsWith(".nar")) {
-            // The functions code is contained in a NAR archive
+        try {
+            // Let's first try to treat it as a nar archive
             
fnCache.registerFunctionInstanceWithArchive(instanceConfig.getFunctionId(), 
instanceConfig.getInstanceId(),
                     jarFile);
-        } else {
-            log.info("Loading JAR files for function {} from archive {}", 
instanceConfig, jarFile);
+        } catch (FileNotFoundException e) {
+            log.info("For Function {} Loading as NAR failed with {}; treating 
it as Jar instead", instanceConfig, e);
             // create the function class loader
             fnCache.registerFunctionInstance(
                     instanceConfig.getFunctionId(),
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5cefc6a67c..7b3cd4f1b1 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -42,13 +42,7 @@
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
 import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
-import org.apache.pulsar.tests.integration.io.CassandraSinkTester;
-import org.apache.pulsar.tests.integration.io.JdbcSinkTester;
-import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
-import org.apache.pulsar.tests.integration.io.KafkaSinkTester;
-import org.apache.pulsar.tests.integration.io.KafkaSourceTester;
-import org.apache.pulsar.tests.integration.io.SinkTester;
-import org.apache.pulsar.tests.integration.io.SourceTester;
+import org.apache.pulsar.tests.integration.io.*;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testng.Assert;
@@ -66,20 +60,25 @@
 
     @Test
     public void testKafkaSink() throws Exception {
-        testSink(new KafkaSinkTester());
+        testSink(new KafkaSinkTester(), true);
     }
 
     @Test
     public void testCassandraSink() throws Exception {
-        testSink(new CassandraSinkTester());
+        testSink(new CassandraSinkTester(), true);
+    }
+
+    @Test
+    public void testCassandraArchiveSink() throws Exception {
+        testSink(new CassandraSinkArchiveTester(), false);
     }
 
     @Test
     public void testJdbcSink() throws Exception {
-        testSink(new JdbcSinkTester());
+        testSink(new JdbcSinkTester(), true);
     }
 
-    private void testSink(SinkTester tester) throws Exception {
+    private void testSink(SinkTester tester, boolean builtin) throws Exception 
{
         tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
 
         final String tenant = TopicName.PUBLIC_TENANT;
@@ -87,7 +86,7 @@ private void testSink(SinkTester tester) throws Exception {
         final String inputTopicName = "test-sink-connector-"
             + tester.getSinkType() + "-" + functionRuntimeType + 
"-input-topic-" + randomName(8);
         final String sinkName = "test-sink-connector-"
-            + tester.getSinkType() + "-" + functionRuntimeType + "-name-" + 
randomName(8);
+            + tester.getSinkType().name().toLowerCase() + "-" + 
functionRuntimeType + "-name-" + randomName(8);
         final int numMessages = 20;
 
         // prepare the testing environment for sink
@@ -97,7 +96,7 @@ private void testSink(SinkTester tester) throws Exception {
         submitSinkConnector(tester, tenant, namespace, sinkName, 
inputTopicName);
 
         // get sink info
-        getSinkInfoSuccess(tester, tenant, namespace, sinkName);
+        getSinkInfoSuccess(tester, tenant, namespace, sinkName, builtin);
 
         // get sink status
         getSinkStatus(tenant, namespace, sinkName);
@@ -105,7 +104,7 @@ private void testSink(SinkTester tester) throws Exception {
         // produce messages
         Map<String, String> kvs;
         if (tester instanceof JdbcSinkTester) {
-            kvs = produceSchemaMessagesToInputTopic(inputTopicName, 
numMessages, AvroSchema.of(Foo.class));
+            kvs = produceSchemaMessagesToInputTopic(inputTopicName, 
numMessages, AvroSchema.of(JdbcSinkTester.Foo.class));
         } else {
             kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
         }
@@ -132,16 +131,31 @@ protected void submitSinkConnector(SinkTester tester,
                                        String namespace,
                                        String sinkName,
                                        String inputTopicName) throws Exception 
{
-        String[] commands = {
-            PulsarCluster.ADMIN_SCRIPT,
-            "sink", "create",
-            "--tenant", tenant,
-            "--namespace", namespace,
-            "--name", sinkName,
-            "--sink-type", tester.sinkType(),
-            "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
-            "--inputs", inputTopicName
-        };
+        String[] commands;
+        if (tester.getSinkType() != SinkTester.SinkType.UNDEFINED) {
+            commands = new String[] {
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "sink", "create",
+                    "--tenant", tenant,
+                    "--namespace", namespace,
+                    "--name", sinkName,
+                    "--sink-type", tester.sinkType().name().toLowerCase(),
+                    "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+                    "--inputs", inputTopicName
+            };
+        } else {
+            commands = new String[] {
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "sink", "create",
+                    "--tenant", tenant,
+                    "--namespace", namespace,
+                    "--name", sinkName,
+                    "--archive", tester.getSinkArchive(),
+                    "--classname", tester.getSinkClassName(),
+                    "--sinkConfig", new Gson().toJson(tester.sinkConfig()),
+                    "--inputs", inputTopicName
+            };
+        }
         log.info("Run command : {}", StringUtils.join(commands, ' '));
         ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
         assertTrue(
@@ -152,7 +166,8 @@ protected void submitSinkConnector(SinkTester tester,
     protected void getSinkInfoSuccess(SinkTester tester,
                                       String tenant,
                                       String namespace,
-                                      String sinkName) throws Exception {
+                                      String sinkName,
+                                      boolean builtin) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
             "functions",
@@ -163,10 +178,17 @@ protected void getSinkInfoSuccess(SinkTester tester,
         };
         ContainerExecResult result = 
pulsarCluster.getAnyWorker().execCmd(commands);
         log.info("Get sink info : {}", result.getStdout());
-        assertTrue(
-            result.getStdout().contains("\"builtin\": \"" + 
tester.getSinkType() + "\""),
-            result.getStdout()
-        );
+        if (builtin) {
+            assertTrue(
+                    result.getStdout().contains("\"builtin\": \"" + 
tester.getSinkType().name().toLowerCase() + "\""),
+                    result.getStdout()
+            );
+        } else {
+            assertTrue(
+                    result.getStdout().contains("\"className\": \"" + 
tester.getSinkClassName() + "\""),
+                    result.getStdout()
+            );
+        }
     }
 
     protected void getSinkStatus(String tenant, String namespace, String 
sinkName) throws Exception {
@@ -231,7 +253,7 @@ protected void getSinkStatus(String tenant, String 
namespace, String sinkName) t
         for (int i = 0; i < numMessages; i++) {
             String key = "key-" + i;
 
-            Foo obj = new Foo();
+            JdbcSinkTester.Foo obj = new JdbcSinkTester.Foo();
             obj.setField1("field1_" + i);
             obj.setField2("field2_" + i);
             obj.setField3(i);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
new file mode 100644
index 0000000000..86c76894c5
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * A tester for testing cassandra sink submitted as an archive.
+ */
+@Slf4j
+public class CassandraSinkArchiveTester extends SinkTester {
+
+    private static final String NAME = "cassandra";
+
+    private static final String ROOTS = "cassandra";
+    private static final String KEY = "key";
+    private static final String COLUMN = "col";
+
+    private final String keySpace;
+    private final String tableName;
+
+    private CassandraContainer cassandraCluster;
+
+    private Cluster cluster;
+    private Session session;
+
+    public CassandraSinkArchiveTester() {
+        
super("/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", 
"org.apache.pulsar.io.cassandra.CassandraStringSink");
+
+        String suffix = randomName(8) + "_" + System.currentTimeMillis();
+        this.keySpace = "keySpace_" + suffix;
+        this.tableName = "tableName_" + suffix;
+
+        sinkConfig.put("roots", ROOTS);
+        sinkConfig.put("keyspace", keySpace);
+        sinkConfig.put("columnFamily", tableName);
+        sinkConfig.put("keyname", KEY);
+        sinkConfig.put("columnName", COLUMN);
+    }
+
+    @Override
+    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
+        GenericContainer<?> container = containers.get(NAME);
+        checkState(container instanceof CassandraContainer,
+            "No kafka service found in the cluster");
+
+        this.cassandraCluster = (CassandraContainer) container;
+    }
+
+    @Override
+    public void prepareSink() {
+        // build the sink
+        cluster = Cluster.builder()
+            .addContactPoint("localhost")
+            .withPort(cassandraCluster.getCassandraPort())
+            .build();
+
+        // connect to the cluster
+        session = cluster.connect();
+        log.info("Connecting to cassandra cluster at localhost:{}", 
cassandraCluster.getCassandraPort());
+
+        String createKeySpace =
+            "CREATE KEYSPACE " + keySpace
+                + " WITH replication = {'class':'SimpleStrategy', 
'replication_factor':1}; ";
+        log.info(createKeySpace);
+        session.execute(createKeySpace);
+        session.execute("USE " + keySpace);
+
+        String createTable = "CREATE TABLE " + tableName
+            + "(" + KEY + " text PRIMARY KEY, "
+            + COLUMN + " text);";
+        log.info(createTable);
+        session.execute(createTable);
+    }
+
+    @Override
+    public void validateSinkResult(Map<String, String> kvs) {
+        String query = "SELECT * FROM " + tableName + ";";
+        ResultSet result = session.execute(query);
+        List<Row> rows = result.all();
+        assertEquals(kvs.size(), rows.size());
+        for (Row row : rows) {
+            String key = row.getString(KEY);
+            String value = row.getString(COLUMN);
+
+            String expectedValue = kvs.get(key);
+            assertNotNull(expectedValue);
+            assertEquals(expectedValue, value);
+        }
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
index e31de9f2ab..c9d3e5a11f 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -55,7 +55,7 @@
     private Session session;
 
     public CassandraSinkTester() {
-        super("cassandra");
+        super(SinkType.CASSANDRA);
 
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.keySpace = "keySpace_" + suffix;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index 6a102f1e39..e4aa401042 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -63,7 +63,7 @@
     private Connection connection;
 
     public JdbcSinkTester() {
-        super(NAME);
+        super(SinkType.JDBC);
 
         // container default value is test
         sinkConfig.put("userName", "test");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
index 1cd58f26de..ff79e1a302 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -52,7 +52,7 @@
     private KafkaConsumer<String, String> kafkaConsumer;
 
     public KafkaSinkTester() {
-        super(NAME);
+        super(SinkType.KAFKA);
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.kafkaTopicName = "kafka_sink_topic_" + suffix;
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index 3eee8219fa..098b8bf7c5 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -29,17 +29,35 @@
 @Getter
 public abstract class SinkTester {
 
-    protected final String sinkType;
+    public enum SinkType {
+        UNDEFINED,
+        CASSANDRA,
+        KAFKA,
+        JDBC
+    }
+
+    protected final SinkType sinkType;
+    protected final String sinkArchive;
+    protected final String sinkClassName;
     protected final Map<String, Object> sinkConfig;
 
-    public SinkTester(String sinkType) {
+    public SinkTester(SinkType sinkType) {
         this.sinkType = sinkType;
+        this.sinkArchive = null;
+        this.sinkClassName = null;
+        this.sinkConfig = Maps.newHashMap();
+    }
+
+    public SinkTester(String sinkArchive, String sinkClassName) {
+        this.sinkType = SinkType.UNDEFINED;
+        this.sinkArchive = sinkArchive;
+        this.sinkClassName = sinkClassName;
         this.sinkConfig = Maps.newHashMap();
     }
 
     public abstract void findSinkServiceContainer(Map<String, 
GenericContainer<?>> externalServices);
 
-    public String sinkType() {
+    public SinkType sinkType() {
         return sinkType;
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to