This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 641887c859a [fix][io] KCA connectors: fix missing runtime dependencies 
(#18370)
641887c859a is described below

commit 641887c859ab5b5bb180c74c36e67d6f1adc5fd0
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Nov 8 13:41:33 2022 +0100

    [fix][io] KCA connectors: fix missing runtime dependencies (#18370)
    
    * [fix][io] KCA connectors: fix missing runtime dependencies
    
    * fix compile
    
    * fix compile
    
    * fix noclassdeffound error
    
    * Fix dep and reduce debezium also
    
    * Fix debezium unit test
    
    * improve object mapper
---
 pulsar-functions/runtime-all/pom.xml               |  1 +
 pulsar-io/debezium/core/pom.xml                    | 25 +++++++---
 .../pulsar/io/debezium/PulsarDatabaseHistory.java  | 14 ++++--
 pulsar-io/debezium/mongodb/pom.xml                 |  7 +++
 pulsar-io/debezium/mssql/pom.xml                   |  7 +++
 pulsar-io/debezium/mysql/pom.xml                   |  6 +++
 pulsar-io/debezium/oracle/pom.xml                  |  7 +++
 pulsar-io/debezium/postgres/pom.xml                |  7 +++
 pulsar-io/kafka-connect-adaptor/pom.xml            | 58 ++++++++++++++++++----
 .../io/kafka/connect/PulsarOffsetBackingStore.java | 15 +++++-
 .../tests/integration/io/PulsarIOTestBase.java     |  5 ++
 .../integration/io/sinks/PulsarSinksTest.java      |  5 ++
 .../debezium/PulsarDebeziumOracleSourceTest.java   |  5 ++
 .../debezium/PulsarDebeziumSourcesTest.java        |  5 ++
 14 files changed, 146 insertions(+), 21 deletions(-)

diff --git a/pulsar-functions/runtime-all/pom.xml 
b/pulsar-functions/runtime-all/pom.xml
index e53f7aa6c2c..bf0b3d36eb1 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -42,6 +42,7 @@
     6. log4j-api
     7. log4j-core
     8. AVRO
+    9. protobuf-java
   -->
 
   <artifactId>pulsar-functions-runtime-all</artifactId>
diff --git a/pulsar-io/debezium/core/pom.xml b/pulsar-io/debezium/core/pom.xml
index 5b4b7f4deb8..f3701887faf 100644
--- a/pulsar-io/debezium/core/pom.xml
+++ b/pulsar-io/debezium/core/pom.xml
@@ -36,12 +36,7 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-core</artifactId>
       <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-client-original</artifactId>
-      <version>${project.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
@@ -50,6 +45,17 @@
       <version>${debezium.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
@@ -103,6 +109,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 </project>
diff --git 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index 9669a6e131b..7ca0d309cf9 100644
--- 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pulsar.io.debezium;
 
-import static org.apache.commons.lang.StringUtils.isBlank;
-import static 
org.apache.pulsar.io.common.IOConfigUtils.loadConfigFromJsonString;
+import static org.apache.commons.lang3.StringUtils.isBlank;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import io.debezium.annotation.ThreadSafe;
 import io.debezium.config.Configuration;
@@ -33,6 +33,7 @@ import io.debezium.relational.history.DatabaseHistoryListener;
 import io.debezium.relational.history.HistoryRecord;
 import io.debezium.relational.history.HistoryRecordComparator;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -99,6 +100,7 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
         DatabaseHistory.NAME,
         READER_CONFIG);
 
+    private final ObjectMapper mapper = new ObjectMapper();
     private final DocumentReader reader = DocumentReader.defaultReader();
     private String topicName;
     private Map<String, Object> readerConfigMap = new HashMap<>();
@@ -120,7 +122,13 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
         }
         this.topicName = config.getString(TOPIC);
         try {
-            this.readerConfigMap = 
loadConfigFromJsonString(config.getString(READER_CONFIG));
+            final String configString = config.getString(READER_CONFIG);
+            if (configString == null) {
+                this.readerConfigMap = Collections.emptyMap();
+            } else {
+                this.readerConfigMap = mapper.readValue(configString, 
Map.class);
+            }
+
         } catch (JsonProcessingException exception) {
             log.warn("The provided reader configs are invalid, "
                     + "will not passing any extra config to the reader 
builder.", exception);
diff --git a/pulsar-io/debezium/mongodb/pom.xml 
b/pulsar-io/debezium/mongodb/pom.xml
index 1650fe2a650..e01948d1125 100644
--- a/pulsar-io/debezium/mongodb/pom.xml
+++ b/pulsar-io/debezium/mongodb/pom.xml
@@ -32,6 +32,13 @@
 
     <dependencies>
 
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-io-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>pulsar-io-debezium-core</artifactId>
diff --git a/pulsar-io/debezium/mssql/pom.xml b/pulsar-io/debezium/mssql/pom.xml
index 6f5fb6dd5d8..62da307f4b8 100644
--- a/pulsar-io/debezium/mssql/pom.xml
+++ b/pulsar-io/debezium/mssql/pom.xml
@@ -32,6 +32,13 @@
 
   <dependencies>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-debezium-core</artifactId>
diff --git a/pulsar-io/debezium/mysql/pom.xml b/pulsar-io/debezium/mysql/pom.xml
index fa0c7b79535..d3c18122086 100644
--- a/pulsar-io/debezium/mysql/pom.xml
+++ b/pulsar-io/debezium/mysql/pom.xml
@@ -31,6 +31,12 @@
   <name>Pulsar IO :: Debezium :: mysql</name>
 
   <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
 
     <dependency>
       <groupId>${project.groupId}</groupId>
diff --git a/pulsar-io/debezium/oracle/pom.xml 
b/pulsar-io/debezium/oracle/pom.xml
index 030a72b09b4..3da3b8d9ac2 100644
--- a/pulsar-io/debezium/oracle/pom.xml
+++ b/pulsar-io/debezium/oracle/pom.xml
@@ -32,6 +32,13 @@
 
   <dependencies>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-debezium-core</artifactId>
diff --git a/pulsar-io/debezium/postgres/pom.xml 
b/pulsar-io/debezium/postgres/pom.xml
index 7c8d74a229f..2b9596c9f88 100644
--- a/pulsar-io/debezium/postgres/pom.xml
+++ b/pulsar-io/debezium/postgres/pom.xml
@@ -32,6 +32,13 @@
 
   <dependencies>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-debezium-core</artifactId>
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml 
b/pulsar-io/kafka-connect-adaptor/pom.xml
index af62fbb6126..19aa1f522d0 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -36,18 +36,17 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-core</artifactId>
       <version>${project.version}</version>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-common</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>*</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
     </dependency>
 
     <dependency>
@@ -82,11 +81,28 @@
       <version>${kafka-client.version}</version>
     </dependency>
 
+    <!-- pulsar-client is only needed for MessageId conversion (for seeking), 
commons-lang3 and Netty buffer manipulation -->
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client-original</artifactId>
       <version>${project.version}</version>
-      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+      <version>${netty.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
     </dependency>
 
     <dependency>
@@ -105,6 +121,9 @@
       </exclusions>
     </dependency>
 
+
+
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-broker</artifactId>
@@ -134,11 +153,30 @@
       <type>test-jar</type>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>${avro.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
     <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.asynchttpclient</groupId>
+      <artifactId>async-http-client</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.typesafe.netty</groupId>
+      <artifactId>netty-reactive-streams</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
+
 </project>
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index ed15ab62ef0..02f315af68f 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -20,15 +20,17 @@ package org.apache.pulsar.io.kafka.connect;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.commons.lang.StringUtils.isBlank;
-import static 
org.apache.pulsar.io.common.IOConfigUtils.loadConfigFromJsonString;
+import static org.apache.commons.lang3.StringUtils.isBlank;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -53,6 +55,7 @@ import org.apache.pulsar.client.api.Schema;
 @Slf4j
 public class PulsarOffsetBackingStore implements OffsetBackingStore {
 
+    private final ObjectMapper mapper = new ObjectMapper();
     private final Map<ByteBuffer, ByteBuffer> data = new ConcurrentHashMap<>();
     private PulsarClient client;
     private String topic;
@@ -249,4 +252,12 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
             }
         });
     }
+
+    private Map<String, Object> loadConfigFromJsonString(String config) throws 
JsonProcessingException {
+        if (!isBlank(config)) {
+            return mapper.readValue(config, new TypeReference<>() {});
+        } else {
+            return Collections.emptyMap();
+        }
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
index e33bdd8e54d..92aa6fd0134 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
@@ -23,10 +23,15 @@ import 
org.apache.pulsar.tests.integration.io.sinks.PulsarIOSinkRunner;
 import org.apache.pulsar.tests.integration.io.sinks.SinkTester;
 import org.apache.pulsar.tests.integration.io.sources.PulsarIOSourceRunner;
 import org.apache.pulsar.tests.integration.io.sources.SourceTester;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.testcontainers.containers.GenericContainer;
 
 public abstract class PulsarIOTestBase extends PulsarFunctionsTestBase {
 
+    public PulsarIOTestBase(FunctionRuntimeType functionRuntimeType) {
+        super(functionRuntimeType);
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     protected void testSink(SinkTester tester, boolean builtin) throws 
Exception {
         tester.startServiceContainer(pulsarCluster);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
index 1775b693f18..42630741833 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
@@ -23,12 +23,17 @@ import 
org.apache.pulsar.tests.integration.io.PulsarIOTestBase;
 import org.apache.pulsar.tests.integration.io.RabbitMQSinkTester;
 import org.apache.pulsar.tests.integration.io.RabbitMQSourceTester;
 import org.apache.pulsar.tests.integration.io.sources.KafkaSourceTester;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 
 public class PulsarSinksTest extends PulsarIOTestBase {
 
+    public PulsarSinksTest() {
+        super(FunctionRuntimeType.PROCESS);
+    }
+
     @DataProvider(name = "withSchema")
     public Object[][] withSchema() {
         return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java
index 10e5d6dbdaf..76413d0843c 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java
@@ -29,6 +29,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import 
org.apache.pulsar.tests.integration.containers.DebeziumOracleDbContainer;
 import org.apache.pulsar.tests.integration.io.PulsarIOTestBase;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.testng.annotations.Test;
 
 import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +39,10 @@ public class PulsarDebeziumOracleSourceTest extends 
PulsarIOTestBase {
 
     protected final AtomicInteger testId = new AtomicInteger(0);
 
+    public PulsarDebeziumOracleSourceTest() {
+        super(FunctionRuntimeType.PROCESS);
+    }
+
     @Test(groups = "source", timeOut = 1800000)
     public void testDebeziumOracleDbSource() throws Exception{
         
testDebeziumOracleDbConnect("org.apache.kafka.connect.json.JsonConverter", 
true);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
index e0e48101c99..9da1f10e74a 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
@@ -33,6 +33,7 @@ import 
org.apache.pulsar.tests.integration.containers.DebeziumMsSqlContainer;
 import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
 import 
org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
 import org.apache.pulsar.tests.integration.io.PulsarIOTestBase;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.testng.annotations.Test;
 
 import lombok.Cleanup;
@@ -43,6 +44,10 @@ public class PulsarDebeziumSourcesTest extends 
PulsarIOTestBase {
 
     protected final AtomicInteger testId = new AtomicInteger(0);
 
+    public PulsarDebeziumSourcesTest() {
+        super(FunctionRuntimeType.PROCESS);
+    }
+
     @Test(groups = "source")
     public void testDebeziumMySqlSourceJson() throws Exception {
         
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true, 
false);

Reply via email to