sijie closed pull request #2440: Issue 2313: create a JDBC sink connector URL: https://github.com/apache/incubator-pulsar/pull/2440
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index 8cf7fce208..bb75e84e34 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -74,6 +74,11 @@ <outputDirectory>connectors</outputDirectory> <fileMode>644</fileMode> </file> + <file> + <source>${basedir}/../../pulsar-io/jdbc/target/pulsar-io-jdbc-${project.version}.nar</source> + <outputDirectory>connectors</outputDirectory> + <fileMode>644</fileMode> + </file> <file> <source>${basedir}/../../pulsar-io/data-genenator/target/pulsar-io-data-generator-${project.version}.nar</source> <outputDirectory>connectors</outputDirectory> diff --git a/pom.xml b/pom.xml index 4bb3cf63bb..c0dc20d492 100644 --- a/pom.xml +++ b/pom.xml @@ -166,6 +166,8 @@ flexible messaging model and an intuitive client API.</description> <aws-sdk.version>1.11.297</aws-sdk.version> <avro.version>1.8.2</avro.version> <jclouds.version>2.1.1</jclouds.version> + <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version> + <mysql-jdbc.version>8.0.11</mysql-jdbc.version> <presto.version>0.206</presto.version> <!-- test dependencies --> @@ -818,6 +820,11 @@ flexible messaging model and an intuitive client API.</description> <artifactId>kafka</artifactId> <version>${testcontainers.version}</version> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>mysql</artifactId> + <version>${testcontainers.version}</version> + </dependency> <dependency> <groupId>org.arquillian.cube</groupId> <artifactId>arquillian-cube-docker</artifactId> @@ -1083,7 +1090,7 @@ flexible messaging model and an intuitive client API.</description> <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude> <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude> <exclude>bin/proto/MLDataFormats_pb2.py</exclude> - + <!-- pulasr-io-connector kinesis : auto generated files from flatbuffer schema --> <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude> <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude> diff --git a/pulsar-io/jdbc/lombok.config b/pulsar-io/jdbc/lombok.config new file mode 100644 index 0000000000..9a9adee272 --- /dev/null +++ b/pulsar-io/jdbc/lombok.config @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +## This file is to fix the conflict with jackson error like this: +## com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of ... +lombok.anyConstructor.addConstructorProperties=true +config.stopBubbling = true diff --git a/pulsar-io/jdbc/pom.xml b/pulsar-io/jdbc/pom.xml new file mode 100644 index 0000000000..eed85883ec --- /dev/null +++ b/pulsar-io/jdbc/pom.xml @@ -0,0 +1,96 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-io</artifactId> + <version>2.2.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>pulsar-io-jdbc</artifactId> + <name>Pulsar IO :: Jdbc</name> + + <dependencies> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-io-core</artifactId> + <version>${project.version}</version> + </dependency> + + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-functions-instance</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.xerial</groupId> + <artifactId>sqlite-jdbc</artifactId> + <version>${sqlite-jdbc.version}</version> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>${mysql-jdbc.version}</version> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-client-original</artifactId> + <version>${project.version}</version> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + +</project> diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java new file mode 100644 index 0000000000..425fb57ac1 --- /dev/null +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static jersey.repackaged.com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +/** + * A Simple abstract class for Jdbc sink + * Users need to implement extractKeyValue function to use this sink + */ +@Slf4j +public abstract class JdbcAbstractSink<T> implements Sink<T> { + // ----- Runtime fields + private JdbcSinkConfig jdbcSinkConfig; + @Getter + private Connection connection; + private String jdbcUrl; + private String tableName; + + private JdbcUtils.TableId tableId; + private PreparedStatement insertStatement; + + // TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic) + protected String schema; + protected JdbcUtils.TableDefinition tableDefinition; + + // for flush + private List<Record<T>> incomingList; + private List<Record<T>> swapList; + private AtomicBoolean isFlushing; + private int timeoutMs; + private int batchSize; + private ScheduledExecutorService flushExecutor; + + @Override + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { + jdbcSinkConfig = JdbcSinkConfig.load(config); + + jdbcUrl = jdbcSinkConfig.getJdbcUrl(); + if (jdbcSinkConfig.getJdbcUrl() == null) { + throw new IllegalArgumentException("Required jdbc Url not set."); + } + + Properties properties = new Properties(); + String username = jdbcSinkConfig.getUserName(); + String password = jdbcSinkConfig.getPassword(); + if (username != null) { + properties.setProperty("user", username); + } + if (password != null) { + properties.setProperty("password", password); + } + + connection = JdbcUtils.getConnection(jdbcUrl, properties); + connection.setAutoCommit(false); + log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit()); + + schema = jdbcSinkConfig.getSchema(); + tableName = jdbcSinkConfig.getTableName(); + tableId = JdbcUtils.getTableId(connection, tableName); + tableDefinition = JdbcUtils.getTableDefinition(connection, tableId); + insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition)); + + timeoutMs = jdbcSinkConfig.getTimeoutMs(); + batchSize = jdbcSinkConfig.getBatchSize(); + incomingList = Lists.newArrayList(); + swapList = Lists.newArrayList(); + isFlushing = new AtomicBoolean(false); + + flushExecutor = Executors.newScheduledThreadPool(1); + flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); + } + + @Override + public void close() throws Exception { + if (!connection.getAutoCommit()) { + connection.commit(); + } + flushExecutor.shutdown(); + if (connection != null) { + connection.close(); + } + log.info("Closed jdbc connection: {}", jdbcUrl); + } + + @Override + public void write(Record<T> record) throws Exception { + int number; + synchronized (incomingList) { + incomingList.add(record); + number = incomingList.size(); + } + + if (number == batchSize) { + flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS); + } + } + + // bind value with a PreparedStetement + public abstract void bindValue( + PreparedStatement statement, + Record<T> message) throws Exception; + + + private void flush() { + // if not in flushing state, do flush, else return; + if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) { + if (log.isDebugEnabled()) { + log.debug("Starting flush, queue size: {}", incomingList.size()); + } + checkState(swapList.isEmpty(), + "swapList should be empty since last flush. swapList.size: " + swapList.size()); + + synchronized (incomingList) { + List<Record<T>> tmpList; + swapList.clear(); + + tmpList = swapList; + swapList = incomingList; + incomingList = tmpList; + } + + int updateCount = 0; + boolean noInfo = false; + try { + // bind each record value + for (Record<T> record : swapList) { + bindValue(insertStatement, record); + insertStatement.addBatch(); + record.ack(); + } + + for (int updates : insertStatement.executeBatch()) { + if (updates == Statement.SUCCESS_NO_INFO) { + noInfo = true; + continue; + } + updateCount += updateCount; + } + connection.commit(); + swapList.forEach(tRecord -> tRecord.ack()); + } catch (Exception e) { + log.error("Got exception ", e); + swapList.forEach(tRecord -> tRecord.fail()); + } + + if (swapList.size() != updateCount) { + log.error("Update count {} not match total number of records {}", updateCount, swapList.size()); + } + + // finish flush + if (log.isDebugEnabled()) { + log.debug("Finish flush, queue size: {}", swapList.size()); + } + isFlushing.set(false); + } else { + if (log.isDebugEnabled()) { + log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size()); + } + } + } + +} diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java new file mode 100644 index 0000000000..ec2822010e --- /dev/null +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import java.sql.PreparedStatement; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.util.Utf8; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId; + +/** + * A Simple Jdbc sink, which assume input Record as AvroSchema format + */ +@Slf4j +public class JdbcAvroSchemaSink extends JdbcAbstractSink<byte[]> { + + private Schema avroSchema = null; + private DatumReader<GenericRecord> reader = null; + + + @Override + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { + super.open(config, sinkContext); + // get reader, and read value out as GenericRecord + if (avroSchema == null || reader == null) { + avroSchema = Schema.parse(schema); + reader = new GenericDatumReader<>(avroSchema); + } + log.info("open JdbcAvroSchemaSink with schema: {}, and tableDefinition: {}", schema, tableDefinition.toString()); + } + + + public void bindValue(PreparedStatement statement, + Record<byte[]> message) throws Exception { + + byte[] value = message.getValue(); + GenericRecord record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null)); + + int index = 1; + for (ColumnId columnId : tableDefinition.getColumns()) { + String colName = columnId.getName(); + Object obj = record.get(colName); + setColumnValue(statement, index++, obj); + log.info("set column value: {}", obj.toString()); + } + } + + private static void setColumnValue(PreparedStatement statement, int index, Object value) throws Exception { + if (value instanceof Integer) { + statement.setInt(index, (Integer) value); + } else if (value instanceof Long) { + statement.setLong(index, (Long) value); + } else if (value instanceof Double) { + statement.setDouble(index, (Double) value); + } else if (value instanceof Float) { + statement.setFloat(index, (Float) value); + } else if (value instanceof Boolean) { + statement.setBoolean(index, (Boolean) value); + } else if (value instanceof Utf8) { + statement.setString(index, ((Utf8)value).toString()); + } else if (value instanceof Short) { + statement.setShort(index, (Short) value); + } else { + throw new Exception("Not support value type, need to add it. " + value.getClass()); + } + } +} + diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java new file mode 100644 index 0000000000..3419811e0a --- /dev/null +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import lombok.*; +import lombok.experimental.Accessors; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; + +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public class JdbcSinkConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private String userName; + private String password; + private String jdbcUrl; + private String tableName; + + // schema for input topic + private String schema; + + // Optional + private int timeoutMs = 500; + private int batchSize = 200; + + public static JdbcSinkConfig load(String yamlFile) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return mapper.readValue(new File(yamlFile), JdbcSinkConfig.class); + } + + public static JdbcSinkConfig load(Map<String, Object> map) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(new ObjectMapper().writeValueAsString(map), JdbcSinkConfig.class); + } +} diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java new file mode 100644 index 0000000000..e95990903d --- /dev/null +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; +import java.util.stream.IntStream; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +/** + * Jdbc Utils + */ +@Slf4j +public class JdbcUtils { + + @Data(staticConstructor = "of") + @Setter + @Getter + @EqualsAndHashCode + @ToString + public static class TableId { + private final String catalogName; + private final String schemaName; + private final String tableName; + } + + @Data(staticConstructor = "of") + @Setter + @Getter + @EqualsAndHashCode + @ToString + public static class ColumnId { + private final TableId tableId; + private final String name; + // SQL type from java.sql.Types + private final int type; + private final String typeName; + // column position in table + private final int position; + } + + @Data(staticConstructor = "of") + @Setter + @Getter + @EqualsAndHashCode + @ToString + public static class TableDefinition { + private final TableId tableId; + private final List<ColumnId> columns; + } + + /** + * Given a driver type(such as mysql), return its jdbc driver class name. + * TODO: test and support more types, also add Driver in pom file. + */ + public static String getDriverClassName(String driver) throws Exception { + if (driver.equals("mysql")) { + return "com.mysql.jdbc.Driver"; + } if (driver.equals("sqlite")) { + return "org.sqlite.JDBC"; + } else { + throw new Exception("Not tested jdbc driver type: " + driver); + } + } + + /** + * Get the {@link Connection} for the given jdbcUrl. + */ + public static Connection getConnection(String jdbcUrl, Properties properties) throws Exception { + String driver = jdbcUrl.split(":")[1]; + String driverClassName = getDriverClassName(driver); + Class.forName(driverClassName); + + return DriverManager.getConnection(jdbcUrl, properties); + } + + /** + * Get the {@link TableId} for the given tableName. + */ + public static TableId getTableId(Connection connection, String tableName) throws Exception { + DatabaseMetaData metadata = connection.getMetaData(); + try (ResultSet rs = metadata.getTables(null, null, tableName, new String[]{"TABLE"})) { + if (rs.next()) { + String catalogName = rs.getString(1); + String schemaName = rs.getString(2); + String gotTableName = rs.getString(3); + checkState(tableName.equals(gotTableName), + "TableName not match: " + tableName + " Got: " + gotTableName); + if (log.isDebugEnabled()) { + log.debug("Get Table: {}, {}, {}", catalogName, schemaName, tableName); + } + return TableId.of(catalogName, schemaName, tableName); + } else { + throw new Exception("Not able to find table: " + tableName); + } + } + } + + /** + * Get the {@link TableDefinition} for the given table. + */ + public static TableDefinition getTableDefinition(Connection connection, TableId tableId) throws Exception { + TableDefinition table = TableDefinition.of(tableId, Lists.newArrayList()); + + try (ResultSet rs = connection.getMetaData().getColumns( + tableId.getCatalogName(), + tableId.getSchemaName(), + tableId.getTableName(), + null + )) { + while (rs.next()) { + final String columnName = rs.getString(4); + + final int sqlDataType = rs.getInt(5); + final String typeName = rs.getString(6); + final int position = rs.getInt(17); + + table.columns.add(ColumnId.of(tableId, columnName, sqlDataType, typeName, position)); + if (log.isDebugEnabled()) { + log.debug("Get column. name: {}, data type: {}, position: {}", columnName, typeName, position); + } + } + return table; + } + } + + public static String buildInsertSql(TableDefinition table) { + StringBuilder builder = new StringBuilder(); + builder.append("INSERT INTO "); + builder.append(table.tableId.getTableName()); + builder.append("("); + + table.columns.forEach(columnId -> builder.append(columnId.getName()).append(",")); + builder.deleteCharAt(builder.length() - 1); + + builder.append(") VALUES("); + IntStream.range(0, table.columns.size() - 1).forEach(i -> builder.append("?,")); + builder.append("?)"); + + return builder.toString(); + } + + public static PreparedStatement buildInsertStatement(Connection connection, String insertSQL) throws SQLException { + return connection.prepareStatement(insertSQL); + } + +} diff --git a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml new file mode 100644 index 0000000000..d9d06bde47 --- /dev/null +++ b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: jdbc +description: Jdbc sink +sinkClass: org.apache.pulsar.io.jdbc.JdbcAvroSchemaSink diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java new file mode 100644 index 0000000000..33bb859547 --- /dev/null +++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import com.google.common.collect.Maps; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.source.PulsarRecord; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Jdbc Sink test + */ +@Slf4j +public class JdbcSinkTest { + private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName()); + + /** + * A Simple class to test jdbc class + */ + @Data + @ToString + @EqualsAndHashCode + public static class Foo { + private String field1; + private String field2; + private int field3; + } + + @BeforeMethod + public void setUp() throws Exception { + sqliteUtils.setUp(); + } + + @AfterMethod + public void tearDown() throws Exception { + sqliteUtils.tearDown(); + } + + @Test + public void TestOpenAndWriteSink() throws Exception { + JdbcAvroSchemaSink jdbcSink; + Map<String, Object> conf; + String tableName = "TestOpenAndWriteSink"; + + String jdbcUrl = sqliteUtils.sqliteUri(); + conf = Maps.newHashMap(); + conf.put("jdbcUrl", jdbcUrl); + conf.put("tableName", tableName); + + jdbcSink = new JdbcAvroSchemaSink(); + + sqliteUtils.createTable( + "CREATE TABLE " + tableName + "(" + + " field1 TEXT," + + " field2 TEXT," + + " field3 INTEGER," + + "PRIMARY KEY (field1));" + ); + + // prepare a foo Record + Foo obj = new Foo(); + obj.setField1("ValueOfField1"); + obj.setField2("ValueOfField1"); + obj.setField3(3); + AvroSchema<Foo> schema = AvroSchema.of(Foo.class); + conf.put("schema", new String(schema.getSchemaInfo().getSchema())); + log.info("schema: {}", new String(schema.getSchemaInfo().getSchema())); + + byte[] bytes = schema.encode(obj); + ByteBuf payload = Unpooled.copiedBuffer(bytes); + Message<byte[]> message = new MessageImpl("77:777", conf, payload, Schema.BYTES); + Record<byte[]> record = PulsarRecord.<byte[]>builder() + .message(message) + .topicName("fake_topic_name") + .build(); + + log.info("foo:{}, Message.getValue: {}, record.getValue: {}", + obj.toString(), + message.getValue().toString(), + record.getValue().toString()); + + // change batchSize to 1, to flush on each write. + conf.put("batchSize", 1); + // open should success + jdbcSink.open(conf, null); + + // write should success. + jdbcSink.write(record); + log.info("executed write"); + // sleep to wait backend flush complete + Thread.sleep(500); + + // value has been written to db, read it out and verify. + String querySql = "SELECT * FROM " + tableName; + sqliteUtils.select(querySql, (resultSet) -> { + Assert.assertEquals(obj.getField1(), resultSet.getString(1)); + Assert.assertEquals(obj.getField2(), resultSet.getString(2)); + Assert.assertEquals(obj.getField3(), resultSet.getInt(3)); + }); + + jdbcSink.close(); + } + +} diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java new file mode 100644 index 0000000000..d58802d715 --- /dev/null +++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.io.jdbc.JdbcUtils.TableDefinition; +import org.apache.pulsar.io.jdbc.JdbcUtils.TableId; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Jdbc Utils test + */ +@Slf4j +public class JdbcUtilsTest { + + private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName()); + @BeforeMethod + public void setUp() throws IOException, SQLException { + sqliteUtils.setUp(); + } + + @AfterMethod + public void tearDown() throws IOException, SQLException { + sqliteUtils.tearDown(); + } + + @Test + public void TestGetTableId() throws Exception { + String tableName = "TestGetTableId"; + + sqliteUtils.createTable( + "CREATE TABLE " + tableName + "(" + + " firstName TEXT," + + " lastName TEXT," + + " age INTEGER," + + " bool NUMERIC," + + " byte INTEGER," + + " short INTEGER NULL," + + " long INTEGER," + + " float NUMERIC," + + " double NUMERIC," + + " bytes BLOB, " + + "PRIMARY KEY (firstName, lastName));" + ); + + Connection connection = sqliteUtils.getConnection(); + + // Test getTableId + log.info("verify getTableId"); + TableId id = JdbcUtils.getTableId(connection, tableName); + Assert.assertEquals(id.getTableName(), tableName); + + // Test get getTableDefinition + log.info("verify getTableDefinition"); + TableDefinition table = JdbcUtils.getTableDefinition(connection, id); + Assert.assertEquals(table.getColumns().get(0).getName(), "firstName"); + Assert.assertEquals(table.getColumns().get(0).getTypeName(), "TEXT"); + Assert.assertEquals(table.getColumns().get(2).getName(), "age"); + Assert.assertEquals(table.getColumns().get(2).getTypeName(), "INTEGER"); + Assert.assertEquals(table.getColumns().get(7).getName(), "float"); + Assert.assertEquals(table.getColumns().get(7).getTypeName(), "NUMERIC"); + + // Test get getTableDefinition + log.info("verify buildInsertSql"); + String expctedStatement = "INSERT INTO " + tableName + + "(firstName,lastName,age,bool,byte,short,long,float,double,bytes)" + + " VALUES(?,?,?,?,?,?,?,?,?,?)"; + String statement = JdbcUtils.buildInsertSql(table); + Assert.assertEquals(statement, expctedStatement); + } + +} diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java new file mode 100644 index 0000000000..3b4a01aaab --- /dev/null +++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public final class SqliteUtils { + + static { + try { + Class.forName("org.sqlite.JDBC"); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + public interface ResultSetReadCallback { + void read(final ResultSet rs) throws SQLException; + } + + private final Path dbPath; + + private Connection connection; + + public Connection getConnection() { + return connection; + } + + public SqliteUtils(String testId) { + dbPath = Paths.get(testId + ".db"); + } + + public String sqliteUri() { + return "jdbc:sqlite:" + dbPath; + } + + public void setUp() throws SQLException, IOException { + Files.deleteIfExists(dbPath); + connection = DriverManager.getConnection(sqliteUri()); + connection.setAutoCommit(false); + } + + public void tearDown() throws SQLException, IOException { + connection.close(); + Files.deleteIfExists(dbPath); + } + + public void createTable(final String createSql) throws SQLException { + execute(createSql); + } + + public void deleteTable(final String table) throws SQLException { + execute("DROP TABLE IF EXISTS " + table); + + //random errors of table not being available happens in the unit tests + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public int select(final String query, final SqliteUtils.ResultSetReadCallback callback) throws SQLException { + int count = 0; + try (Statement stmt = connection.createStatement()) { + try (ResultSet rs = stmt.executeQuery(query)) { + while (rs.next()) { + callback.read(rs); + count++; + } + } + } + return count; + } + + public void execute(String sql) throws SQLException { + try (Statement stmt = connection.createStatement()) { + stmt.executeUpdate(sql); + connection.commit(); + } + } + +} diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml index f1494db6e7..e89cc0271a 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/pom.xml @@ -38,6 +38,7 @@ <module>kafka</module> <module>rabbitmq</module> <module>kinesis</module> + <module>jdbc</module> <module>data-genenator</module> </modules> diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index a3a46944fc..6d3fdc4064 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -78,6 +78,27 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>mysql</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>${mysql-jdbc.version}</version> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-io-jdbc</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 67136a6ae1..5cefc6a67c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -36,18 +36,22 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.tests.integration.docker.ContainerExecException; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator; import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime; import org.apache.pulsar.tests.integration.io.CassandraSinkTester; +import org.apache.pulsar.tests.integration.io.JdbcSinkTester; +import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo; import org.apache.pulsar.tests.integration.io.KafkaSinkTester; import org.apache.pulsar.tests.integration.io.KafkaSourceTester; import org.apache.pulsar.tests.integration.io.SinkTester; import org.apache.pulsar.tests.integration.io.SourceTester; import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.testng.Assert; import org.testng.annotations.Test; /** @@ -70,15 +74,20 @@ public void testCassandraSink() throws Exception { testSink(new CassandraSinkTester()); } + @Test + public void testJdbcSink() throws Exception { + testSink(new JdbcSinkTester()); + } + private void testSink(SinkTester tester) throws Exception { tester.findSinkServiceContainer(pulsarCluster.getExternalServices()); final String tenant = TopicName.PUBLIC_TENANT; final String namespace = TopicName.DEFAULT_NAMESPACE; final String inputTopicName = "test-sink-connector-" - + functionRuntimeType + "-input-topic-" + randomName(8); + + tester.getSinkType() + "-" + functionRuntimeType + "-input-topic-" + randomName(8); final String sinkName = "test-sink-connector-" - + functionRuntimeType + "-name-" + randomName(8); + + tester.getSinkType() + "-" + functionRuntimeType + "-name-" + randomName(8); final int numMessages = 20; // prepare the testing environment for sink @@ -94,7 +103,12 @@ private void testSink(SinkTester tester) throws Exception { getSinkStatus(tenant, namespace, sinkName); // produce messages - Map<String, String> kvs = produceMessagesToInputTopic(inputTopicName, numMessages); + Map<String, String> kvs; + if (tester instanceof JdbcSinkTester) { + kvs = produceSchemaMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(Foo.class)); + } else { + kvs = produceMessagesToInputTopic(inputTopicName, numMessages); + } // wait for sink to process messages waitForProcessingMessages(tenant, namespace, sinkName, numMessages); @@ -202,6 +216,36 @@ protected void getSinkStatus(String tenant, String namespace, String sinkName) t return kvs; } + // This for JdbcSinkTester + protected Map<String, String> produceSchemaMessagesToInputTopic(String inputTopicName, + int numMessages, Schema schema) throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + @Cleanup + Producer<String> producer = client.newProducer(Schema.STRING) + .topic(inputTopicName) + .create(); + LinkedHashMap<String, String> kvs = new LinkedHashMap<>(); + for (int i = 0; i < numMessages; i++) { + String key = "key-" + i; + + Foo obj = new Foo(); + obj.setField1("field1_" + i); + obj.setField2("field2_" + i); + obj.setField3(i); + String value = new String(schema.encode(obj)); + + kvs.put(key, value); + producer.newMessage() + .key(key) + .value(value) + .send(); + } + return kvs; + } + protected void waitForProcessingMessages(String tenant, String namespace, String sinkName, @@ -226,8 +270,8 @@ protected void waitForProcessingMessages(String tenant, // expected in early iterations } - log.info("{} ms has elapsed but the sink hasn't process {} messages, backoff to wait for another 1 second", - stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages); + log.info("{} ms has elapsed but the sink {} hasn't process {} messages, backoff to wait for another 1 second", + stopwatch.elapsed(TimeUnit.MILLISECONDS), sinkName, numMessages); TimeUnit.SECONDS.sleep(1); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java index fe4795d96e..7a47f77ef0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java @@ -52,7 +52,7 @@ protected PulsarFunctionsTestBase(FunctionRuntimeType functionRuntimeType) { @BeforeClass public void setupFunctionWorkers() { - final int numFunctionWorkers = 2; + final int numFunctionWorkers = 3; log.info("Setting up {} function workers : function runtime type = {}", numFunctionWorkers, functionRuntimeType); pulsarCluster.setupFunctionWorkers(randomName(5), functionRuntimeType, numFunctionWorkers); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java new file mode 100644 index 0000000000..6a102f1e39 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io; + +import static com.google.common.base.Preconditions.checkState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MySQLContainer; + +/** + * A tester for testing jdbc sink. + * This will use MySql as DB server + */ +@Slf4j +public class JdbcSinkTester extends SinkTester { + + /** + * A Simple class to test jdbc class, + * + */ + @Data + @ToString + @EqualsAndHashCode + public static class Foo { + private String field1; + private String field2; + private int field3; + } + + private static final String NAME = "jdbc"; + + private MySQLContainer mySQLContainer; + private AvroSchema<Foo> schema = AvroSchema.of(Foo.class); + private String tableName = "test"; + private Connection connection; + + public JdbcSinkTester() { + super(NAME); + + // container default value is test + sinkConfig.put("userName", "test"); + sinkConfig.put("password", "test"); + sinkConfig.put("tableName", tableName); + + // prepare schema + sinkConfig.put("schema", new String(schema.getSchemaInfo().getSchema())); + log.info("schema: {}", new String(schema.getSchemaInfo().getSchema())); + sinkConfig.put("batchSize", 1); + } + + @Override + public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { + GenericContainer<?> container = containers.get("mysql"); + checkState(container instanceof MySQLContainer, + "No MySQL service found in the cluster"); + + this.mySQLContainer = (MySQLContainer) container; + log.info("find sink service container: {}", mySQLContainer.getContainerName()); + } + + @Override + public void prepareSink() throws Exception { + String jdbcUrl = mySQLContainer.getJdbcUrl(); + // we need set mysql server address in cluster network. + sinkConfig.put("jdbcUrl", "jdbc:mysql://mysql:3306/test"); + String driver = mySQLContainer.getDriverClassName(); + Class.forName(driver); + + connection = DriverManager.getConnection(jdbcUrl, "test", "test"); + log.info("getConnection: {}, jdbcurl: {}", connection, jdbcUrl); + + // create table + String createTable = "CREATE TABLE " + tableName + + " (field1 TEXT, field2 TEXT, field3 INTEGER, PRIMARY KEY (field3))"; + int ret = connection.createStatement().executeUpdate(createTable); + log.info("created table in jdbc: {}, return value: {}", createTable, ret); + } + + @Override + public void validateSinkResult(Map<String, String> kvs) { + log.info("Query table content from mysql server: {}", tableName); + String querySql = "SELECT * FROM " + tableName; + ResultSet rs; + try { + // backend flush may not complete. + Thread.sleep(1000); + + PreparedStatement statement = connection.prepareStatement(querySql); + rs = statement.executeQuery(); + + while (rs.next()) { + String field1 = rs.getString(1); + String field2 = rs.getString(2); + int field3 = rs.getInt(3); + + String value = kvs.get("key-" + field3); + + Foo obj = schema.decode(value.getBytes()); + assertEquals(obj.field1, field1); + assertEquals(obj.field2, field2); + assertEquals(obj.field3, field3); + } + } catch (Exception e) { + log.error("Got exception: ", e); + fail("Got exception when op sql."); + return; + } + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java index e20a933e4d..147f273abb 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java @@ -24,6 +24,7 @@ import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.MySQLContainer; import org.testng.ITest; import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeSuite; @@ -51,6 +52,7 @@ protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, Pulsar // register external services Map<String, GenericContainer<?>> externalServices = Maps.newHashMap(); + final String kafkaServiceName = "kafka"; externalServices.put( kafkaServiceName, @@ -60,10 +62,19 @@ protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, Pulsar .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd .withName(kafkaServiceName) .withHostName(clusterName + "-" + kafkaServiceName))); + final String cassandraServiceName = "cassandra"; externalServices.put( cassandraServiceName, new CassandraContainer(clusterName)); + + // use mySQL for jdbc test + final String jdbcServiceName = "mysql"; + externalServices.put( + jdbcServiceName, + new MySQLContainer() + .withExposedPorts(3306)); + builder = builder.externalServices(externalServices); return builder; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services