sijie closed pull request #2440: Issue 2313: create a JDBC sink connector
URL: https://github.com/apache/incubator-pulsar/pull/2440
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distribution/io/src/assemble/io.xml 
b/distribution/io/src/assemble/io.xml
index 8cf7fce208..bb75e84e34 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -74,6 +74,11 @@
       <outputDirectory>connectors</outputDirectory>
       <fileMode>644</fileMode>
     </file>
+    <file>
+      
<source>${basedir}/../../pulsar-io/jdbc/target/pulsar-io-jdbc-${project.version}.nar</source>
+      <outputDirectory>connectors</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
     <file>
       
<source>${basedir}/../../pulsar-io/data-genenator/target/pulsar-io-data-generator-${project.version}.nar</source>
       <outputDirectory>connectors</outputDirectory>
diff --git a/pom.xml b/pom.xml
index 4bb3cf63bb..c0dc20d492 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,6 +166,8 @@ flexible messaging model and an intuitive client 
API.</description>
     <aws-sdk.version>1.11.297</aws-sdk.version>
     <avro.version>1.8.2</avro.version>
     <jclouds.version>2.1.1</jclouds.version>
+    <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
+    <mysql-jdbc.version>8.0.11</mysql-jdbc.version>
     <presto.version>0.206</presto.version>
 
     <!-- test dependencies -->
@@ -818,6 +820,11 @@ flexible messaging model and an intuitive client 
API.</description>
         <artifactId>kafka</artifactId>
         <version>${testcontainers.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.testcontainers</groupId>
+        <artifactId>mysql</artifactId>
+        <version>${testcontainers.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.arquillian.cube</groupId>
         <artifactId>arquillian-cube-docker</artifactId>
@@ -1083,7 +1090,7 @@ flexible messaging model and an intuitive client 
API.</description>
             
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
             
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
             <exclude>bin/proto/MLDataFormats_pb2.py</exclude>
-            
+
             <!-- pulasr-io-connector kinesis : auto generated files from 
flatbuffer schema -->
             
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude>
             
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude>
diff --git a/pulsar-io/jdbc/lombok.config b/pulsar-io/jdbc/lombok.config
new file mode 100644
index 0000000000..9a9adee272
--- /dev/null
+++ b/pulsar-io/jdbc/lombok.config
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+## This file is to fix the conflict with jackson error like this:
+##    com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot 
construct instance of ...
+lombok.anyConstructor.addConstructorProperties=true
+config.stopBubbling = true
diff --git a/pulsar-io/jdbc/pom.xml b/pulsar-io/jdbc/pom.xml
new file mode 100644
index 0000000000..eed85883ec
--- /dev/null
+++ b/pulsar-io/jdbc/pom.xml
@@ -0,0 +1,96 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.2.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-jdbc</artifactId>
+  <name>Pulsar IO :: Jdbc</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>${avro.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-instance</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.xerial</groupId>
+      <artifactId>sqlite-jdbc</artifactId>
+      <version>${sqlite-jdbc.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>${mysql-jdbc.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+
+</project>
diff --git 
a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java 
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
new file mode 100644
index 0000000000..425fb57ac1
--- /dev/null
+++ 
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static 
jersey.repackaged.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Simple abstract class for Jdbc sink
+ * Users need to implement extractKeyValue function to use this sink
+ */
+@Slf4j
+public abstract class JdbcAbstractSink<T> implements Sink<T> {
+    // ----- Runtime fields
+    private JdbcSinkConfig jdbcSinkConfig;
+    @Getter
+    private Connection connection;
+    private String jdbcUrl;
+    private String tableName;
+
+    private JdbcUtils.TableId tableId;
+    private PreparedStatement insertStatement;
+
+    // TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
+    protected String schema;
+    protected JdbcUtils.TableDefinition tableDefinition;
+
+    // for flush
+    private List<Record<T>> incomingList;
+    private List<Record<T>> swapList;
+    private AtomicBoolean isFlushing;
+    private int timeoutMs;
+    private int batchSize;
+    private ScheduledExecutorService flushExecutor;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
+        jdbcSinkConfig = JdbcSinkConfig.load(config);
+
+        jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+        if (jdbcSinkConfig.getJdbcUrl() == null) {
+            throw new IllegalArgumentException("Required jdbc Url not set.");
+        }
+
+        Properties properties = new Properties();
+        String username = jdbcSinkConfig.getUserName();
+        String password = jdbcSinkConfig.getPassword();
+        if (username != null) {
+            properties.setProperty("user", username);
+        }
+        if (password != null) {
+            properties.setProperty("password", password);
+        }
+
+        connection = JdbcUtils.getConnection(jdbcUrl, properties);
+        connection.setAutoCommit(false);
+        log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, 
connection.getAutoCommit());
+
+        schema = jdbcSinkConfig.getSchema();
+        tableName = jdbcSinkConfig.getTableName();
+        tableId = JdbcUtils.getTableId(connection, tableName);
+        tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
+        insertStatement = JdbcUtils.buildInsertStatement(connection, 
JdbcUtils.buildInsertSql(tableDefinition));
+
+        timeoutMs = jdbcSinkConfig.getTimeoutMs();
+        batchSize = jdbcSinkConfig.getBatchSize();
+        incomingList = Lists.newArrayList();
+        swapList = Lists.newArrayList();
+        isFlushing = new AtomicBoolean(false);
+
+        flushExecutor = Executors.newScheduledThreadPool(1);
+        flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, 
TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!connection.getAutoCommit()) {
+            connection.commit();
+        }
+        flushExecutor.shutdown();
+        if (connection != null) {
+            connection.close();
+        }
+        log.info("Closed jdbc connection: {}", jdbcUrl);
+    }
+
+    @Override
+    public void write(Record<T> record) throws Exception {
+        int number;
+        synchronized (incomingList) {
+            incomingList.add(record);
+            number = incomingList.size();
+        }
+
+        if (number == batchSize) {
+            flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    // bind value with a PreparedStetement
+    public abstract void bindValue(
+        PreparedStatement statement,
+        Record<T> message) throws Exception;
+
+
+    private void flush() {
+        // if not in flushing state, do flush, else return;
+        if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Starting flush, queue size: {}", 
incomingList.size());
+            }
+            checkState(swapList.isEmpty(),
+                "swapList should be empty since last flush. swapList.size: " + 
swapList.size());
+
+            synchronized (incomingList) {
+                List<Record<T>> tmpList;
+                swapList.clear();
+
+                tmpList = swapList;
+                swapList = incomingList;
+                incomingList = tmpList;
+            }
+
+            int updateCount = 0;
+            boolean noInfo = false;
+            try {
+                // bind each record value
+                for (Record<T> record : swapList) {
+                    bindValue(insertStatement, record);
+                    insertStatement.addBatch();
+                    record.ack();
+                }
+
+                for (int updates : insertStatement.executeBatch()) {
+                    if (updates == Statement.SUCCESS_NO_INFO) {
+                        noInfo = true;
+                        continue;
+                    }
+                    updateCount += updateCount;
+                }
+                connection.commit();
+                swapList.forEach(tRecord -> tRecord.ack());
+            } catch (Exception e) {
+                log.error("Got exception ", e);
+                swapList.forEach(tRecord -> tRecord.fail());
+            }
+
+            if (swapList.size() != updateCount) {
+                log.error("Update count {}  not match total number of records 
{}", updateCount, swapList.size());
+            }
+
+            // finish flush
+            if (log.isDebugEnabled()) {
+                log.debug("Finish flush, queue size: {}", swapList.size());
+            }
+            isFlushing.set(false);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Already in flushing state, will not flush, queue 
size: {}", incomingList.size());
+            }
+        }
+    }
+
+}
diff --git 
a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
 
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
new file mode 100644
index 0000000000..ec2822010e
--- /dev/null
+++ 
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import java.sql.PreparedStatement;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
+
+/**
+ * A Simple Jdbc sink, which assume input Record as AvroSchema format
+ */
+@Slf4j
+public class JdbcAvroSchemaSink extends JdbcAbstractSink<byte[]> {
+
+    private Schema avroSchema = null;
+    private DatumReader<GenericRecord> reader = null;
+
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
+        super.open(config, sinkContext);
+        // get reader, and read value out as GenericRecord
+        if (avroSchema == null || reader == null) {
+            avroSchema = Schema.parse(schema);
+            reader = new GenericDatumReader<>(avroSchema);
+        }
+        log.info("open JdbcAvroSchemaSink with schema: {}, and 
tableDefinition: {}", schema, tableDefinition.toString());
+    }
+
+
+    public void bindValue(PreparedStatement statement,
+                          Record<byte[]> message) throws Exception {
+
+        byte[] value = message.getValue();
+        GenericRecord record = reader.read(null, 
DecoderFactory.get().binaryDecoder(value, null));
+
+        int index = 1;
+        for (ColumnId columnId : tableDefinition.getColumns()) {
+            String colName = columnId.getName();
+            Object obj = record.get(colName);
+            setColumnValue(statement, index++, obj);
+            log.info("set column value: {}", obj.toString());
+        }
+    }
+
+    private static void setColumnValue(PreparedStatement statement, int index, 
Object value) throws Exception {
+        if (value instanceof Integer) {
+            statement.setInt(index, (Integer) value);
+        } else if (value instanceof Long) {
+            statement.setLong(index, (Long) value);
+        } else if (value instanceof Double) {
+            statement.setDouble(index, (Double) value);
+        } else if (value instanceof Float) {
+            statement.setFloat(index, (Float) value);
+        } else if (value instanceof Boolean) {
+            statement.setBoolean(index, (Boolean) value);
+        } else if (value instanceof Utf8) {
+            statement.setString(index, ((Utf8)value).toString());
+        } else if (value instanceof Short) {
+            statement.setShort(index, (Short) value);
+        } else {
+            throw new Exception("Not support value type, need to add it. " + 
value.getClass());
+        }
+    }
+}
+
diff --git 
a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java 
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
new file mode 100644
index 0000000000..3419811e0a
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class JdbcSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String userName;
+    private String password;
+    private String jdbcUrl;
+    private String tableName;
+
+    // schema for input topic
+    private String schema;
+
+    // Optional
+    private int timeoutMs = 500;
+    private int batchSize = 200;
+
+    public static JdbcSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), JdbcSinkConfig.class);
+    }
+
+    public static JdbcSinkConfig load(Map<String, Object> map) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
JdbcSinkConfig.class);
+    }
+}
diff --git 
a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java 
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
new file mode 100644
index 0000000000..e95990903d
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.IntStream;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Jdbc Utils
+ */
+@Slf4j
+public class JdbcUtils {
+
+    @Data(staticConstructor = "of")
+    @Setter
+    @Getter
+    @EqualsAndHashCode
+    @ToString
+    public static class TableId {
+        private final String catalogName;
+        private final String schemaName;
+        private final String tableName;
+    }
+
+    @Data(staticConstructor = "of")
+    @Setter
+    @Getter
+    @EqualsAndHashCode
+    @ToString
+    public static class ColumnId {
+        private final TableId tableId;
+        private final String name;
+        // SQL type from java.sql.Types
+        private final int type;
+        private final String typeName;
+        // column position in table
+        private final int position;
+    }
+
+    @Data(staticConstructor = "of")
+    @Setter
+    @Getter
+    @EqualsAndHashCode
+    @ToString
+    public static class TableDefinition {
+        private final TableId tableId;
+        private final List<ColumnId> columns;
+    }
+
+    /**
+     * Given a driver type(such as mysql), return its jdbc driver class name.
+     * TODO: test and support more types, also add Driver in pom file.
+     */
+    public static String getDriverClassName(String driver) throws Exception {
+        if (driver.equals("mysql")) {
+            return "com.mysql.jdbc.Driver";
+        } if (driver.equals("sqlite")) {
+            return "org.sqlite.JDBC";
+        } else {
+            throw new Exception("Not tested jdbc driver type: " + driver);
+        }
+    }
+
+    /**
+     * Get the {@link Connection} for the given jdbcUrl.
+     */
+    public static Connection getConnection(String jdbcUrl, Properties 
properties) throws Exception {
+        String driver = jdbcUrl.split(":")[1];
+        String driverClassName = getDriverClassName(driver);
+        Class.forName(driverClassName);
+
+        return DriverManager.getConnection(jdbcUrl, properties);
+    }
+
+    /**
+     * Get the {@link TableId} for the given tableName.
+     */
+    public static TableId getTableId(Connection connection, String tableName) 
throws Exception {
+        DatabaseMetaData metadata = connection.getMetaData();
+        try (ResultSet rs = metadata.getTables(null, null, tableName, new 
String[]{"TABLE"})) {
+            if (rs.next()) {
+                String catalogName = rs.getString(1);
+                String schemaName = rs.getString(2);
+                String gotTableName = rs.getString(3);
+                checkState(tableName.equals(gotTableName),
+                    "TableName not match: " + tableName + " Got: " + 
gotTableName);
+                if (log.isDebugEnabled()) {
+                    log.debug("Get Table: {}, {}, {}", catalogName, 
schemaName, tableName);
+                }
+                return TableId.of(catalogName, schemaName, tableName);
+            } else {
+                throw new Exception("Not able to find table: " + tableName);
+            }
+        }
+    }
+
+    /**
+     * Get the {@link TableDefinition} for the given table.
+     */
+    public static TableDefinition getTableDefinition(Connection connection, 
TableId tableId) throws Exception {
+        TableDefinition table = TableDefinition.of(tableId, 
Lists.newArrayList());
+
+        try (ResultSet rs = connection.getMetaData().getColumns(
+            tableId.getCatalogName(),
+            tableId.getSchemaName(),
+            tableId.getTableName(),
+            null
+        )) {
+            while (rs.next()) {
+                final String columnName = rs.getString(4);
+
+                final int sqlDataType = rs.getInt(5);
+                final String typeName = rs.getString(6);
+                final int position = rs.getInt(17);
+
+                table.columns.add(ColumnId.of(tableId, columnName, 
sqlDataType, typeName, position));
+                if (log.isDebugEnabled()) {
+                    log.debug("Get column. name: {}, data type: {}, position: 
{}", columnName, typeName, position);
+                }
+            }
+            return table;
+        }
+    }
+
+    public static String buildInsertSql(TableDefinition table) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("INSERT INTO ");
+        builder.append(table.tableId.getTableName());
+        builder.append("(");
+
+        table.columns.forEach(columnId -> 
builder.append(columnId.getName()).append(","));
+        builder.deleteCharAt(builder.length() - 1);
+
+        builder.append(") VALUES(");
+        IntStream.range(0, table.columns.size() - 1).forEach(i -> 
builder.append("?,"));
+        builder.append("?)");
+
+        return builder.toString();
+    }
+
+    public static PreparedStatement buildInsertStatement(Connection 
connection, String insertSQL) throws SQLException {
+        return connection.prepareStatement(insertSQL);
+    }
+
+}
diff --git a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000000..d9d06bde47
--- /dev/null
+++ b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: jdbc
+description: Jdbc sink
+sinkClass: org.apache.pulsar.io.jdbc.JdbcAvroSchemaSink
diff --git 
a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java 
b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
new file mode 100644
index 0000000000..33bb859547
--- /dev/null
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Jdbc Sink test
+ */
+@Slf4j
+public class JdbcSinkTest {
+    private final SqliteUtils sqliteUtils = new 
SqliteUtils(getClass().getSimpleName());
+
+    /**
+     * A Simple class to test jdbc class
+     */
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        private String field1;
+        private String field2;
+        private int field3;
+    }
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        sqliteUtils.setUp();
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        sqliteUtils.tearDown();
+    }
+
+    @Test
+    public void TestOpenAndWriteSink() throws Exception {
+        JdbcAvroSchemaSink jdbcSink;
+        Map<String, Object> conf;
+        String tableName = "TestOpenAndWriteSink";
+
+        String jdbcUrl = sqliteUtils.sqliteUri();
+        conf = Maps.newHashMap();
+        conf.put("jdbcUrl", jdbcUrl);
+        conf.put("tableName", tableName);
+
+        jdbcSink = new JdbcAvroSchemaSink();
+
+        sqliteUtils.createTable(
+            "CREATE TABLE " + tableName + "(" +
+                "    field1  TEXT," +
+                "    field2  TEXT," +
+                "    field3 INTEGER," +
+                "PRIMARY KEY (field1));"
+        );
+
+        // prepare a foo Record
+        Foo obj = new Foo();
+        obj.setField1("ValueOfField1");
+        obj.setField2("ValueOfField1");
+        obj.setField3(3);
+        AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+        conf.put("schema",  new String(schema.getSchemaInfo().getSchema()));
+        log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
+
+        byte[] bytes = schema.encode(obj);
+        ByteBuf payload = Unpooled.copiedBuffer(bytes);
+        Message<byte[]> message = new MessageImpl("77:777", conf, payload, 
Schema.BYTES);
+        Record<byte[]> record = PulsarRecord.<byte[]>builder()
+            .message(message)
+            .topicName("fake_topic_name")
+            .build();
+
+        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+            obj.toString(),
+            message.getValue().toString(),
+            record.getValue().toString());
+
+        // change batchSize to 1, to flush on each write.
+        conf.put("batchSize", 1);
+        // open should success
+        jdbcSink.open(conf, null);
+
+        // write should success.
+        jdbcSink.write(record);
+        log.info("executed write");
+        // sleep to wait backend flush complete
+        Thread.sleep(500);
+
+        // value has been written to db, read it out and verify.
+        String querySql = "SELECT * FROM " + tableName;
+        sqliteUtils.select(querySql, (resultSet) -> {
+            Assert.assertEquals(obj.getField1(), resultSet.getString(1));
+            Assert.assertEquals(obj.getField2(), resultSet.getString(2));
+            Assert.assertEquals(obj.getField3(), resultSet.getInt(3));
+        });
+
+        jdbcSink.close();
+    }
+
+}
diff --git 
a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java 
b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
new file mode 100644
index 0000000000..d58802d715
--- /dev/null
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.jdbc.JdbcUtils.TableDefinition;
+import org.apache.pulsar.io.jdbc.JdbcUtils.TableId;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Jdbc Utils test
+ */
+@Slf4j
+public class JdbcUtilsTest {
+
+    private final SqliteUtils sqliteUtils = new 
SqliteUtils(getClass().getSimpleName());
+    @BeforeMethod
+    public void setUp() throws IOException, SQLException {
+        sqliteUtils.setUp();
+    }
+
+    @AfterMethod
+    public void tearDown() throws IOException, SQLException {
+        sqliteUtils.tearDown();
+    }
+
+    @Test
+    public void TestGetTableId() throws Exception {
+        String tableName = "TestGetTableId";
+
+        sqliteUtils.createTable(
+            "CREATE TABLE " + tableName + "(" +
+                "    firstName  TEXT," +
+                "    lastName  TEXT," +
+                "    age INTEGER," +
+                "    bool  NUMERIC," +
+                "    byte  INTEGER," +
+                "    short INTEGER NULL," +
+                "    long INTEGER," +
+                "    float NUMERIC," +
+                "    double NUMERIC," +
+                "    bytes BLOB, " +
+                "PRIMARY KEY (firstName, lastName));"
+        );
+
+        Connection connection = sqliteUtils.getConnection();
+
+        // Test getTableId
+        log.info("verify getTableId");
+        TableId id = JdbcUtils.getTableId(connection, tableName);
+        Assert.assertEquals(id.getTableName(), tableName);
+
+        // Test get getTableDefinition
+        log.info("verify getTableDefinition");
+        TableDefinition table = JdbcUtils.getTableDefinition(connection, id);
+        Assert.assertEquals(table.getColumns().get(0).getName(), "firstName");
+        Assert.assertEquals(table.getColumns().get(0).getTypeName(), "TEXT");
+        Assert.assertEquals(table.getColumns().get(2).getName(), "age");
+        Assert.assertEquals(table.getColumns().get(2).getTypeName(), 
"INTEGER");
+        Assert.assertEquals(table.getColumns().get(7).getName(), "float");
+        Assert.assertEquals(table.getColumns().get(7).getTypeName(), 
"NUMERIC");
+
+        // Test get getTableDefinition
+        log.info("verify buildInsertSql");
+        String expctedStatement = "INSERT INTO " + tableName +
+            "(firstName,lastName,age,bool,byte,short,long,float,double,bytes)" 
+
+            " VALUES(?,?,?,?,?,?,?,?,?,?)";
+        String statement = JdbcUtils.buildInsertSql(table);
+        Assert.assertEquals(statement, expctedStatement);
+    }
+
+}
diff --git 
a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java 
b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
new file mode 100644
index 0000000000..3b4a01aaab
--- /dev/null
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public final class SqliteUtils {
+
+    static {
+        try {
+            Class.forName("org.sqlite.JDBC");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public interface ResultSetReadCallback {
+        void read(final ResultSet rs) throws SQLException;
+    }
+
+    private final Path dbPath;
+
+    private Connection connection;
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public SqliteUtils(String testId) {
+        dbPath = Paths.get(testId + ".db");
+    }
+
+    public String sqliteUri() {
+        return "jdbc:sqlite:" + dbPath;
+    }
+
+    public void setUp() throws SQLException, IOException {
+        Files.deleteIfExists(dbPath);
+        connection = DriverManager.getConnection(sqliteUri());
+        connection.setAutoCommit(false);
+    }
+
+    public void tearDown() throws SQLException, IOException {
+        connection.close();
+        Files.deleteIfExists(dbPath);
+    }
+
+    public void createTable(final String createSql) throws SQLException {
+        execute(createSql);
+    }
+
+    public void deleteTable(final String table) throws SQLException {
+        execute("DROP TABLE IF EXISTS " + table);
+
+        //random errors of table not being available happens in the unit tests
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public int select(final String query, final 
SqliteUtils.ResultSetReadCallback callback) throws SQLException {
+        int count = 0;
+        try (Statement stmt = connection.createStatement()) {
+            try (ResultSet rs = stmt.executeQuery(query)) {
+                while (rs.next()) {
+                    callback.read(rs);
+                    count++;
+                }
+            }
+        }
+        return count;
+    }
+
+    public void execute(String sql) throws SQLException {
+        try (Statement stmt = connection.createStatement()) {
+            stmt.executeUpdate(sql);
+            connection.commit();
+        }
+    }
+
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index f1494db6e7..e89cc0271a 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -38,6 +38,7 @@
     <module>kafka</module>
     <module>rabbitmq</module>
     <module>kinesis</module>
+    <module>jdbc</module>
     <module>data-genenator</module>
   </modules>
 
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index a3a46944fc..6d3fdc4064 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -78,6 +78,27 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>mysql</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>${mysql-jdbc.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-io-jdbc</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 67136a6ae1..5cefc6a67c 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -36,18 +36,22 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
 import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
 import org.apache.pulsar.tests.integration.io.CassandraSinkTester;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
 import org.apache.pulsar.tests.integration.io.KafkaSinkTester;
 import org.apache.pulsar.tests.integration.io.KafkaSourceTester;
 import org.apache.pulsar.tests.integration.io.SinkTester;
 import org.apache.pulsar.tests.integration.io.SourceTester;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 /**
@@ -70,15 +74,20 @@ public void testCassandraSink() throws Exception {
         testSink(new CassandraSinkTester());
     }
 
+    @Test
+    public void testJdbcSink() throws Exception {
+        testSink(new JdbcSinkTester());
+    }
+
     private void testSink(SinkTester tester) throws Exception {
         tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
 
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String inputTopicName = "test-sink-connector-"
-            + functionRuntimeType + "-input-topic-" + randomName(8);
+            + tester.getSinkType() + "-" + functionRuntimeType + 
"-input-topic-" + randomName(8);
         final String sinkName = "test-sink-connector-"
-            + functionRuntimeType + "-name-" + randomName(8);
+            + tester.getSinkType() + "-" + functionRuntimeType + "-name-" + 
randomName(8);
         final int numMessages = 20;
 
         // prepare the testing environment for sink
@@ -94,7 +103,12 @@ private void testSink(SinkTester tester) throws Exception {
         getSinkStatus(tenant, namespace, sinkName);
 
         // produce messages
-        Map<String, String> kvs = produceMessagesToInputTopic(inputTopicName, 
numMessages);
+        Map<String, String> kvs;
+        if (tester instanceof JdbcSinkTester) {
+            kvs = produceSchemaMessagesToInputTopic(inputTopicName, 
numMessages, AvroSchema.of(Foo.class));
+        } else {
+            kvs = produceMessagesToInputTopic(inputTopicName, numMessages);
+        }
 
         // wait for sink to process messages
         waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
@@ -202,6 +216,36 @@ protected void getSinkStatus(String tenant, String 
namespace, String sinkName) t
         return kvs;
     }
 
+    // This for JdbcSinkTester
+    protected Map<String, String> produceSchemaMessagesToInputTopic(String 
inputTopicName,
+                                                              int numMessages, 
 Schema schema) throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic(inputTopicName)
+            .create();
+        LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
+        for (int i = 0; i < numMessages; i++) {
+            String key = "key-" + i;
+
+            Foo obj = new Foo();
+            obj.setField1("field1_" + i);
+            obj.setField2("field2_" + i);
+            obj.setField3(i);
+            String value = new String(schema.encode(obj));
+
+            kvs.put(key, value);
+            producer.newMessage()
+                .key(key)
+                .value(value)
+                .send();
+        }
+        return kvs;
+    }
+
     protected void waitForProcessingMessages(String tenant,
                                              String namespace,
                                              String sinkName,
@@ -226,8 +270,8 @@ protected void waitForProcessingMessages(String tenant,
                 // expected in early iterations
             }
 
-            log.info("{} ms has elapsed but the sink hasn't process {} 
messages, backoff to wait for another 1 second",
-                stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+            log.info("{} ms has elapsed but the sink {} hasn't process {} 
messages, backoff to wait for another 1 second",
+                stopwatch.elapsed(TimeUnit.MILLISECONDS), sinkName, 
numMessages);
             TimeUnit.SECONDS.sleep(1);
         }
     }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index fe4795d96e..7a47f77ef0 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -52,7 +52,7 @@ protected PulsarFunctionsTestBase(FunctionRuntimeType 
functionRuntimeType) {
 
     @BeforeClass
     public void setupFunctionWorkers() {
-        final int numFunctionWorkers = 2;
+        final int numFunctionWorkers = 3;
         log.info("Setting up {} function workers : function runtime type = {}",
             numFunctionWorkers, functionRuntimeType);
         pulsarCluster.setupFunctionWorkers(randomName(5), functionRuntimeType, 
numFunctionWorkers);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
new file mode 100644
index 0000000000..6a102f1e39
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+
+/**
+ * A tester for testing jdbc sink.
+ * This will use MySql as DB server
+ */
+@Slf4j
+public class JdbcSinkTester extends SinkTester {
+
+    /**
+     * A Simple class to test jdbc class,
+     *
+     */
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        private String field1;
+        private String field2;
+        private int field3;
+    }
+
+    private static final String NAME = "jdbc";
+
+    private MySQLContainer mySQLContainer;
+    private AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+    private String tableName = "test";
+    private Connection connection;
+
+    public JdbcSinkTester() {
+        super(NAME);
+
+        // container default value is test
+        sinkConfig.put("userName", "test");
+        sinkConfig.put("password", "test");
+        sinkConfig.put("tableName", tableName);
+
+        // prepare schema
+        sinkConfig.put("schema",  new 
String(schema.getSchemaInfo().getSchema()));
+        log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
+        sinkConfig.put("batchSize", 1);
+    }
+
+    @Override
+    public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
+        GenericContainer<?> container = containers.get("mysql");
+        checkState(container instanceof MySQLContainer,
+            "No MySQL service found in the cluster");
+
+        this.mySQLContainer = (MySQLContainer) container;
+        log.info("find sink service container: {}", 
mySQLContainer.getContainerName());
+    }
+
+    @Override
+    public void prepareSink() throws Exception {
+        String jdbcUrl = mySQLContainer.getJdbcUrl();
+        // we need set mysql server address in cluster network.
+        sinkConfig.put("jdbcUrl", "jdbc:mysql://mysql:3306/test");
+        String driver = mySQLContainer.getDriverClassName();
+        Class.forName(driver);
+
+        connection = DriverManager.getConnection(jdbcUrl, "test", "test");
+        log.info("getConnection: {}, jdbcurl: {}", connection, jdbcUrl);
+
+        // create table
+        String createTable = "CREATE TABLE " + tableName +
+            " (field1 TEXT, field2 TEXT, field3 INTEGER, PRIMARY KEY 
(field3))";
+        int ret = connection.createStatement().executeUpdate(createTable);
+        log.info("created table in jdbc: {}, return value: {}", createTable, 
ret);
+    }
+
+    @Override
+    public void validateSinkResult(Map<String, String> kvs) {
+        log.info("Query table content from mysql server: {}", tableName);
+        String querySql = "SELECT * FROM " + tableName;
+        ResultSet rs;
+        try {
+            // backend flush may not complete.
+            Thread.sleep(1000);
+
+            PreparedStatement statement = 
connection.prepareStatement(querySql);
+            rs = statement.executeQuery();
+
+            while (rs.next()) {
+                String field1 = rs.getString(1);
+                String field2 = rs.getString(2);
+                int field3 = rs.getInt(3);
+
+                String value = kvs.get("key-" + field3);
+
+                Foo obj = schema.decode(value.getBytes());
+                assertEquals(obj.field1, field1);
+                assertEquals(obj.field2, field2);
+                assertEquals(obj.field3, field3);
+            }
+        } catch (Exception e) {
+            log.error("Got exception: ", e);
+            fail("Got exception when op sql.");
+            return;
+        }
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index e20a933e4d..147f273abb 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -24,6 +24,7 @@
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.MySQLContainer;
 import org.testng.ITest;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
@@ -51,6 +52,7 @@ protected PulsarClusterSpecBuilder beforeSetupCluster(String 
clusterName, Pulsar
 
         // register external services
         Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
+
         final String kafkaServiceName = "kafka";
         externalServices.put(
             kafkaServiceName,
@@ -60,10 +62,19 @@ protected PulsarClusterSpecBuilder 
beforeSetupCluster(String clusterName, Pulsar
                 .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
                     .withName(kafkaServiceName)
                     .withHostName(clusterName + "-" + kafkaServiceName)));
+
         final String cassandraServiceName = "cassandra";
         externalServices.put(
             cassandraServiceName,
             new CassandraContainer(clusterName));
+
+        // use mySQL for jdbc test
+        final String jdbcServiceName = "mysql";
+        externalServices.put(
+            jdbcServiceName,
+            new MySQLContainer()
+                .withExposedPorts(3306));
+
         builder = builder.externalServices(externalServices);
 
         return builder;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to