This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 1cbdb49 [HUDI-251] Adds JDBC source support for DeltaStreamer (#2915) 1cbdb49 is described below commit 1cbdb49816ac779f803d0b7397f911a2f81ecca5 Author: Sagar Sumit <sagarsumi...@gmail.com> AuthorDate: Sat Jun 19 19:42:11 2021 +0530 [HUDI-251] Adds JDBC source support for DeltaStreamer (#2915) As discussed in RFC-14, this change implements the first phase of JDBC incremental puller. It consists following changes: - JdbcSource: This class extends RowSource and implements fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) - SqlQueryBuilder: A simple utility class to build sql queries fluently. - Implements two modes of fetching: full and incremental. Full is a complete scan of RDBMS table. Incremental is delta since last checkpoint. Incremental mode falls back to full fetch in case of any exception. --- .../org/apache/hudi/utilities/SqlQueryBuilder.java | 146 +++++++ .../apache/hudi/utilities/sources/JdbcSource.java | 343 ++++++++++++++++ .../apache/hudi/utilities/TestSqlQueryBuilder.java | 64 +++ .../functional/TestHoodieDeltaStreamer.java | 44 ++ .../hudi/utilities/sources/TestJdbcSource.java | 443 +++++++++++++++++++++ .../hudi/utilities/testutils/JdbcTestUtils.java | 195 +++++++++ 6 files changed, 1235 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/SqlQueryBuilder.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/SqlQueryBuilder.java new file mode 100644 index 0000000..a333f44 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/SqlQueryBuilder.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.common.util.StringUtils; + +/** + * SQL query builder. Current support for: SELECT, FROM, JOIN, ON, WHERE, ORDER BY, LIMIT clauses. + */ +public class SqlQueryBuilder { + + private final StringBuilder sqlBuilder; + + private SqlQueryBuilder(StringBuilder sqlBuilder) { + this.sqlBuilder = sqlBuilder; + } + + /** + * Creates a SELECT query. + * + * @param columns The column names to select. + * @return The new {@link SqlQueryBuilder} instance. + */ + public static SqlQueryBuilder select(String... columns) { + if (columns == null || columns.length == 0) { + throw new IllegalArgumentException("No columns provided with SELECT statement. Please mention column names or '*' to select all columns."); + } + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("select "); + sqlBuilder.append(String.join(", ", columns)); + return new SqlQueryBuilder(sqlBuilder); + } + + /** + * Appends a FROM clause to a query. + * + * @param tables The table names to select from. + * @return The {@link SqlQueryBuilder} instance. + */ + public SqlQueryBuilder from(String... tables) { + if (tables == null || tables.length == 0) { + throw new IllegalArgumentException("No table name provided with FROM clause. Please provide a table name to select from."); + } + sqlBuilder.append(" from "); + sqlBuilder.append(String.join(", ", tables)); + return this; + } + + /** + * Appends a JOIN clause to a query. + * + * @param table The table to join with. + * @return The {@link SqlQueryBuilder} instance. + */ + public SqlQueryBuilder join(String table) { + if (StringUtils.isNullOrEmpty(table)) { + throw new IllegalArgumentException("No table name provided with JOIN clause. Please provide a table name to join with."); + } + sqlBuilder.append(" join "); + sqlBuilder.append(table); + return this; + } + + /** + * Appends an ON clause to a query. + * + * @param predicate The predicate to join on. + * @return The {@link SqlQueryBuilder} instance. + */ + public SqlQueryBuilder on(String predicate) { + if (StringUtils.isNullOrEmpty(predicate)) { + throw new IllegalArgumentException(); + } + sqlBuilder.append(" on "); + sqlBuilder.append(predicate); + return this; + } + + /** + * Appends a WHERE clause to a query. + * + * @param predicate The predicate for WHERE clause. + * @return The {@link SqlQueryBuilder} instance. + */ + public SqlQueryBuilder where(String predicate) { + if (StringUtils.isNullOrEmpty(predicate)) { + throw new IllegalArgumentException("No predicate provided with WHERE clause. Please provide a predicate to filter records."); + } + sqlBuilder.append(" where "); + sqlBuilder.append(predicate); + return this; + } + + /** + * Appends an ORDER BY clause to a query. By default, records are ordered in ascending order by the given column. + * To order in descending order use DESC after the column name, e.g. queryBuilder.orderBy("update_time desc"). + * + * @param columns Column names to order by. + * @return The {@link SqlQueryBuilder} instance. + */ + public SqlQueryBuilder orderBy(String... columns) { + if (columns == null || columns.length == 0) { + throw new IllegalArgumentException("No columns provided with ORDER BY clause. Please provide a column name to order records."); + } + sqlBuilder.append(" order by "); + sqlBuilder.append(String.join(", ", columns)); + return this; + } + + /** + * Appends a "limit" clause to a query. + * + * @param count The limit count. + * @return The {@link SqlQueryBuilder} instance. + */ + public SqlQueryBuilder limit(long count) { + if (count < 0) { + throw new IllegalArgumentException("Please provide a positive integer for the LIMIT clause."); + } + sqlBuilder.append(" limit "); + sqlBuilder.append(count); + return this; + } + + @Override + public String toString() { + return sqlBuilder.toString(); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java new file mode 100644 index 0000000..58b970f --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.SqlQueryBuilder; +import org.apache.hudi.utilities.schema.SchemaProvider; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.storage.StorageLevel; + +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * Reads data from RDBMS data sources. + */ + +public class JdbcSource extends RowSource { + + private static final Logger LOG = LogManager.getLogger(JdbcSource.class); + private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", "h2"); + private static final String URI_JDBC_PREFIX = "jdbc:"; + + public JdbcSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, + SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider); + } + + /** + * Validates all user properties and prepares the {@link DataFrameReader} to read from RDBMS. + * + * @param session The {@link SparkSession}. + * @param properties The JDBC connection properties and data source options. + * @return The {@link DataFrameReader} to read from RDBMS + * @throws HoodieException + */ + private static DataFrameReader validatePropsAndGetDataFrameReader(final SparkSession session, + final TypedProperties properties) + throws HoodieException { + DataFrameReader dataFrameReader; + FSDataInputStream passwordFileStream = null; + try { + dataFrameReader = session.read().format("jdbc"); + dataFrameReader = dataFrameReader.option(Config.URL_PROP, properties.getString(Config.URL)); + dataFrameReader = dataFrameReader.option(Config.USER_PROP, properties.getString(Config.USER)); + dataFrameReader = dataFrameReader.option(Config.DRIVER_PROP, properties.getString(Config.DRIVER_CLASS)); + dataFrameReader = dataFrameReader + .option(Config.RDBMS_TABLE_PROP, properties.getString(Config.RDBMS_TABLE_NAME)); + + if (properties.containsKey(Config.PASSWORD)) { + LOG.info("Reading JDBC password from properties file...."); + dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, properties.getString(Config.PASSWORD)); + } else if (properties.containsKey(Config.PASSWORD_FILE) + && !StringUtils.isNullOrEmpty(properties.getString(Config.PASSWORD_FILE))) { + LOG.info(String.format("Reading JDBC password from password file %s", properties.getString(Config.PASSWORD_FILE))); + FileSystem fileSystem = FileSystem.get(session.sparkContext().hadoopConfiguration()); + passwordFileStream = fileSystem.open(new Path(properties.getString(Config.PASSWORD_FILE))); + byte[] bytes = new byte[passwordFileStream.available()]; + passwordFileStream.read(bytes); + dataFrameReader = dataFrameReader.option(Config.PASSWORD_PROP, new String(bytes)); + } else { + throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS " + + "datasource", Config.PASSWORD_FILE, Config.PASSWORD)); + } + + addExtraJdbcOptions(properties, dataFrameReader); + + if (properties.getBoolean(Config.IS_INCREMENTAL)) { + DataSourceUtils.checkRequiredProperties(properties, Collections.singletonList(Config.INCREMENTAL_COLUMN)); + } + return dataFrameReader; + } catch (Exception e) { + throw new HoodieException("Failed to validate properties", e); + } finally { + IOUtils.closeStream(passwordFileStream); + } + } + + /** + * Accepts spark JDBC options from the user in terms of EXTRA_OPTIONS adds them to {@link DataFrameReader} Example: In + * a normal spark code you would do something like: session.read.format('jdbc') .option(fetchSize,1000) + * .option(timestampFormat,"yyyy-mm-dd hh:mm:ss") + * <p> + * The way to pass these properties to HUDI is through the config file. Any property starting with + * hoodie.deltastreamer.jdbc.extra.options. will be added. + * <p> + * Example: hoodie.deltastreamer.jdbc.extra.options.fetchSize=100 + * hoodie.deltastreamer.jdbc.extra.options.upperBound=1 + * hoodie.deltastreamer.jdbc.extra.options.lowerBound=100 + * + * @param properties The JDBC connection properties and data source options. + * @param dataFrameReader The {@link DataFrameReader} to which data source options will be added. + */ + private static void addExtraJdbcOptions(TypedProperties properties, DataFrameReader dataFrameReader) { + Set<Object> objects = properties.keySet(); + for (Object property : objects) { + String prop = property.toString(); + if (prop.startsWith(Config.EXTRA_OPTIONS)) { + String key = String.join("", prop.split(Config.EXTRA_OPTIONS)); + String value = properties.getString(prop); + if (!StringUtils.isNullOrEmpty(value)) { + LOG.info(String.format("Adding %s -> %s to jdbc options", key, value)); + dataFrameReader.option(key, value); + } + } + } + } + + @Override + protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) throws HoodieException { + try { + DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.URL, Config.DRIVER_CLASS, Config.USER, Config.RDBMS_TABLE_NAME, Config.IS_INCREMENTAL)); + return fetch(lastCkptStr, sourceLimit); + } catch (HoodieException e) { + LOG.error("Exception while running JDBCSource ", e); + throw e; + } catch (Exception e) { + LOG.error("Exception while running JDBCSource ", e); + throw new HoodieException("Error fetching next batch from JDBC source. Last checkpoint: " + lastCkptStr.orElse(null), e); + } + } + + /** + * Decide to do a full RDBMS table scan or an incremental scan based on the lastCkptStr. If previous checkpoint + * value exists then we do an incremental scan with a PPD query or else we do a full scan. In certain cases where the + * incremental query fails, we fallback to a full scan. + * + * @param lastCkptStr Last checkpoint. + * @return The pair of {@link Dataset} and current checkpoint. + */ + private Pair<Option<Dataset<Row>>, String> fetch(Option<String> lastCkptStr, long sourceLimit) { + Dataset<Row> dataset; + if (lastCkptStr.isPresent() && !StringUtils.isNullOrEmpty(lastCkptStr.get())) { + dataset = incrementalFetch(lastCkptStr, sourceLimit); + } else { + LOG.info("No checkpoint references found. Doing a full rdbms table fetch"); + dataset = fullFetch(sourceLimit); + } + dataset.persist(StorageLevel.fromString(props.getString(Config.STORAGE_LEVEL, "MEMORY_AND_DISK_SER"))); + boolean isIncremental = props.getBoolean(Config.IS_INCREMENTAL); + Pair<Option<Dataset<Row>>, String> pair = Pair.of(Option.of(dataset), checkpoint(dataset, isIncremental, lastCkptStr)); + dataset.unpersist(); + return pair; + } + + /** + * Does an incremental scan with PPQ query prepared on the bases of previous checkpoint. + * + * @param lastCheckpoint Last checkpoint. + * Note that the records fetched will be exclusive of the last checkpoint (i.e. incremental column value > lastCheckpoint). + * @return The {@link Dataset} after incremental fetch from RDBMS. + */ + private Dataset<Row> incrementalFetch(Option<String> lastCheckpoint, long sourceLimit) { + try { + final String ppdQuery = "(%s) rdbms_table"; + final SqlQueryBuilder queryBuilder = SqlQueryBuilder.select("*") + .from(props.getString(Config.RDBMS_TABLE_NAME)) + .where(String.format(" %s > '%s'", props.getString(Config.INCREMENTAL_COLUMN), lastCheckpoint.get())); + + if (sourceLimit > 0) { + URI jdbcURI = URI.create(props.getString(Config.URL).substring(URI_JDBC_PREFIX.length())); + if (DB_LIMIT_CLAUSE.contains(jdbcURI.getScheme())) { + queryBuilder.orderBy(props.getString(Config.INCREMENTAL_COLUMN)).limit(sourceLimit); + } + } + String query = String.format(ppdQuery, queryBuilder.toString()); + LOG.info("PPD QUERY: " + query); + LOG.info(String.format("Referenced last checkpoint and prepared new predicate pushdown query for jdbc pull %s", query)); + return validatePropsAndGetDataFrameReader(sparkSession, props).option(Config.RDBMS_TABLE_PROP, query).load(); + } catch (Exception e) { + LOG.error("Error while performing an incremental fetch. Not all database support the PPD query we generate to do an incremental scan", e); + if (props.containsKey(Config.FALLBACK_TO_FULL_FETCH) && props.getBoolean(Config.FALLBACK_TO_FULL_FETCH)) { + LOG.warn("Falling back to full scan."); + return fullFetch(sourceLimit); + } + throw e; + } + } + + /** + * Does a full scan on the RDBMS data source. + * + * @return The {@link Dataset} after running full scan. + */ + private Dataset<Row> fullFetch(long sourceLimit) { + final String ppdQuery = "(%s) rdbms_table"; + final SqlQueryBuilder queryBuilder = SqlQueryBuilder.select("*") + .from(props.getString(Config.RDBMS_TABLE_NAME)); + if (sourceLimit > 0) { + URI jdbcURI = URI.create(props.getString(Config.URL).substring(URI_JDBC_PREFIX.length())); + if (DB_LIMIT_CLAUSE.contains(jdbcURI.getScheme())) { + if (props.containsKey(Config.INCREMENTAL_COLUMN)) { + queryBuilder.orderBy(props.getString(Config.INCREMENTAL_COLUMN)).limit(sourceLimit); + } else { + queryBuilder.limit(sourceLimit); + } + } + } + String query = String.format(ppdQuery, queryBuilder.toString()); + return validatePropsAndGetDataFrameReader(sparkSession, props).option(Config.RDBMS_TABLE_PROP, query).load(); + } + + private String checkpoint(Dataset<Row> rowDataset, boolean isIncremental, Option<String> lastCkptStr) { + try { + if (isIncremental) { + Column incrementalColumn = rowDataset.col(props.getString(Config.INCREMENTAL_COLUMN)); + final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first().getString(0); + LOG.info(String.format("Checkpointing column %s with value: %s ", incrementalColumn, max)); + if (max != null) { + return max; + } + return lastCkptStr.isPresent() && !StringUtils.isNullOrEmpty(lastCkptStr.get()) ? lastCkptStr.get() : StringUtils.EMPTY_STRING; + } else { + return StringUtils.EMPTY_STRING; + } + } catch (Exception e) { + LOG.error("Failed to checkpoint"); + throw new HoodieException("Failed to checkpoint. Last checkpoint: " + lastCkptStr.orElse(null), e); + } + } + + /** + * Inner class with config keys. + */ + protected static class Config { + + /** + * {@value #URL} is the jdbc url for the Hoodie datasource. + */ + private static final String URL = "hoodie.deltastreamer.jdbc.url"; + + private static final String URL_PROP = "url"; + + /** + * {@value #USER} is the username used for JDBC connection. + */ + private static final String USER = "hoodie.deltastreamer.jdbc.user"; + + /** + * {@value #USER_PROP} used internally to build jdbc params. + */ + private static final String USER_PROP = "user"; + + /** + * {@value #PASSWORD} is the password used for JDBC connection. + */ + private static final String PASSWORD = "hoodie.deltastreamer.jdbc.password"; + + /** + * {@value #PASSWORD_FILE} is the base-path for the JDBC password file. + */ + private static final String PASSWORD_FILE = "hoodie.deltastreamer.jdbc.password.file"; + + /** + * {@value #PASSWORD_PROP} used internally to build jdbc params. + */ + private static final String PASSWORD_PROP = "password"; + + /** + * {@value #DRIVER_CLASS} used for JDBC connection. + */ + private static final String DRIVER_CLASS = "hoodie.deltastreamer.jdbc.driver.class"; + + /** + * {@value #DRIVER_PROP} used internally to build jdbc params. + */ + private static final String DRIVER_PROP = "driver"; + + /** + * {@value #RDBMS_TABLE_NAME} RDBMS table to pull. + */ + private static final String RDBMS_TABLE_NAME = "hoodie.deltastreamer.jdbc.table.name"; + + /** + * {@value #RDBMS_TABLE_PROP} used internally for jdbc. + */ + private static final String RDBMS_TABLE_PROP = "dbtable"; + + /** + * {@value #INCREMENTAL_COLUMN} if ran in incremental mode, this field will be used to pull new data incrementally. + */ + private static final String INCREMENTAL_COLUMN = "hoodie.deltastreamer.jdbc.table.incr.column.name"; + + /** + * {@value #IS_INCREMENTAL} will the JDBC source do an incremental pull? + */ + private static final String IS_INCREMENTAL = "hoodie.deltastreamer.jdbc.incr.pull"; + + /** + * {@value #EXTRA_OPTIONS} used to set any extra options the user specifies for jdbc. + */ + private static final String EXTRA_OPTIONS = "hoodie.deltastreamer.jdbc.extra.options."; + + /** + * {@value #STORAGE_LEVEL} is used to control the persistence level. Default value: MEMORY_AND_DISK_SER. + */ + private static final String STORAGE_LEVEL = "hoodie.deltastreamer.jdbc.storage.level"; + + /** + * {@value #FALLBACK_TO_FULL_FETCH} is a boolean, which if set true, makes incremental fetch to fallback to full fetch in case of any error. + */ + private static final String FALLBACK_TO_FULL_FETCH = "hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch"; + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSqlQueryBuilder.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSqlQueryBuilder.java new file mode 100644 index 0000000..bfc66ae --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSqlQueryBuilder.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Tests {@link SqlQueryBuilder}. + */ +public class TestSqlQueryBuilder { + + @Test + public void testSelect() { + String sql = SqlQueryBuilder.select("id", "rider", "time") + .from("trips") + .join("users").on("trips.rider = users.id") + .where("(trips.time > 100 or trips.time < 200)") + .orderBy("id", "time") + .limit(10).toString(); + + assertEquals("select id, rider, time from trips " + + "join users on trips.rider = users.id " + + "where (trips.time > 100 or trips.time < 200) " + + "order by id, time " + + "limit 10", sql); + } + + @Test + public void testIncorrectQueries() { + assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select().toString()); + + assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select("*").from().toString()); + + assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select("id").from("trips").where("").toString()); + + assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select("id").from("trips").join("").toString()); + + assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select("id").from("trips").join("riders").on("").toString()); + + assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select("id").from("trips").join("riders").where("id > 0").orderBy().toString()); + + assertThrows(IllegalArgumentException.class, () -> SqlQueryBuilder.select("id").from("trips").join("riders").where("id > 0").orderBy("id").limit(-1).toString()); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index eb3de6d..64fc531 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -18,6 +18,8 @@ package org.apache.hudi.utilities.functional; +import java.sql.Connection; +import java.sql.DriverManager; import java.util.ConcurrentModificationException; import java.util.concurrent.ExecutorService; import org.apache.hudi.DataSourceWriteOptions; @@ -50,10 +52,12 @@ import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.sources.CsvDFSSource; import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.InputBatch; +import org.apache.hudi.utilities.sources.JdbcSource; import org.apache.hudi.utilities.sources.JsonKafkaSource; import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config; +import org.apache.hudi.utilities.testutils.JdbcTestUtils; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource; import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; @@ -109,6 +113,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; /** * Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, upserts, inserts. Check counts at the end. @@ -1594,6 +1599,45 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); } + @Test + public void testJdbcSourceIncrementalFetchInContinuousMode() { + try (Connection connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc")) { + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.jdbc.url", "jdbc:h2:mem:test_mem"); + props.setProperty("hoodie.deltastreamer.jdbc.driver.class", "org.h2.Driver"); + props.setProperty("hoodie.deltastreamer.jdbc.user", "test"); + props.setProperty("hoodie.deltastreamer.jdbc.password", "jdbc"); + props.setProperty("hoodie.deltastreamer.jdbc.table.name", "triprec"); + props.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); + props.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id"); + + props.setProperty("hoodie.datasource.write.keygenerator.class", SimpleKeyGenerator.class.getName()); + props.setProperty("hoodie.datasource.write.recordkey.field", "ID"); + props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + + UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/test-jdbc-source.properties"); + + int numRecords = 1000; + int sourceLimit = 100; + String tableBasePath = dfsBasePath + "/triprec"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, JdbcSource.class.getName(), + null, "test-jdbc-source.properties", false, + false, sourceLimit, false, null, null, "timestamp"); + cfg.continuousMode = true; + // Add 1000 records + JdbcTestUtils.clearAndInsert("000", numRecords, connection, new HoodieTestDataGenerator(), props); + + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc); + deltaStreamerTestRunner(deltaStreamer, cfg, (r) -> { + TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit + ((numRecords % sourceLimit == 0) ? 0 : 1), tableBasePath, dfs); + TestHelpers.assertRecordCount(numRecords, tableBasePath + "/*/*.parquet", sqlContext); + return true; + }); + } catch (Exception e) { + fail(e.getMessage()); + } + } + /** * UDF to calculate Haversine distance. */ diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java new file mode 100644 index 0000000..62aebe3 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.storage.StorageLevel; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.stream.Collectors; + +import static org.apache.hudi.utilities.testutils.JdbcTestUtils.clearAndInsert; +import static org.apache.hudi.utilities.testutils.JdbcTestUtils.close; +import static org.apache.hudi.utilities.testutils.JdbcTestUtils.count; +import static org.apache.hudi.utilities.testutils.JdbcTestUtils.insert; +import static org.apache.hudi.utilities.testutils.JdbcTestUtils.update; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Tests {@link JdbcSource}. + */ +public class TestJdbcSource extends UtilitiesTestBase { + + private static final TypedProperties PROPS = new TypedProperties(); + private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator(); + private static Connection connection; + + @BeforeEach + public void setup() throws Exception { + super.setup(); + PROPS.setProperty("hoodie.deltastreamer.jdbc.url", "jdbc:h2:mem:test_mem"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.driver.class", "org.h2.Driver"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.user", "test"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.password", "jdbc"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.table.name", "triprec"); + connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc"); + } + + @AfterEach + public void teardown() throws Exception { + super.teardown(); + close(connection); + } + + @Test + public void testSingleCommit() { + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + + try { + int numRecords = 100; + String commitTime = "000"; + + // Insert 100 records with commit time + clearAndInsert(commitTime, numRecords, connection, DATA_GENERATOR, PROPS); + + // Validate if we have specified records in db + assertEquals(numRecords, count(connection, "triprec")); + + // Start JdbcSource + Dataset<Row> rowDataset = runSource(Option.empty(), numRecords).getBatch().get(); + assertEquals(numRecords, rowDataset.count()); + } catch (SQLException e) { + fail(e.getMessage()); + } + } + + @Test + public void testInsertAndUpdate() { + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + + try { + final String commitTime = "000"; + final int numRecords = 100; + + // Add 100 records. Update half of them with commit time "007". + update("007", + clearAndInsert(commitTime, numRecords, connection, DATA_GENERATOR, PROPS) + .stream() + .limit(50) + .collect(Collectors.toList()), + connection, DATA_GENERATOR, PROPS + ); + // Check if database has 100 records + assertEquals(numRecords, count(connection, "triprec")); + + // Start JdbcSource + Dataset<Row> rowDataset = runSource(Option.empty(), 100).getBatch().get(); + assertEquals(100, rowDataset.count()); + + Dataset<Row> firstCommit = rowDataset.where("commit_time=000"); + assertEquals(50, firstCommit.count()); + + Dataset<Row> secondCommit = rowDataset.where("commit_time=007"); + assertEquals(50, secondCommit.count()); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testTwoCommits() { + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + + try { + // Add 10 records with commit time "000" + clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS); + + // Start JdbcSource + Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get(); + assertEquals(10, rowDataset.where("commit_time=000").count()); + + // Add 10 records with commit time 001 + insert("001", 5, connection, DATA_GENERATOR, PROPS); + rowDataset = runSource(Option.empty(), 15).getBatch().get(); + assertEquals(15, rowDataset.count()); + assertEquals(5, rowDataset.where("commit_time=001").count()); + assertEquals(10, rowDataset.where("commit_time=000").count()); + + // Start second commit and check if all records are pulled + rowDataset = runSource(Option.empty(), 15).getBatch().get(); + assertEquals(15, rowDataset.count()); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testIncrementalFetchWithCommitTime() { + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + + try { + // Add 10 records with commit time "000" + clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS); + + // Start JdbcSource + InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 10); + Dataset<Row> rowDataset = batch.getBatch().get(); + assertEquals(10, rowDataset.count()); + + // Add 10 records with commit time "001" + insert("001", 10, connection, DATA_GENERATOR, PROPS); + + // Start incremental scan + rowDataset = runSource(Option.of(batch.getCheckpointForNextBatch()), 10).getBatch().get(); + assertEquals(10, rowDataset.count()); + assertEquals(10, rowDataset.where("commit_time=001").count()); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testIncrementalFetchWithNoMatchingRows() { + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + + try { + // Add 10 records with commit time "000" + clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS); + + // Start JdbcSource + InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 10); + Dataset<Row> rowDataset = batch.getBatch().get(); + assertEquals(10, rowDataset.count()); + + // Start incremental scan + rowDataset = runSource(Option.of(batch.getCheckpointForNextBatch()), 10).getBatch().get(); + assertEquals(0, rowDataset.count()); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testIncrementalFetchWhenTableRecordsMoreThanSourceLimit() { + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id"); + + try { + // Add 100 records with commit time "000" + clearAndInsert("000", 100, connection, DATA_GENERATOR, PROPS); + + // Start JdbcSource + InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 100); + Dataset<Row> rowDataset = batch.getBatch().get(); + assertEquals(100, rowDataset.count()); + + // Add 100 records with commit time "001" + insert("001", 100, connection, DATA_GENERATOR, PROPS); + + // Start incremental scan. Now there are 100 more records but with sourceLimit set to 60, only fetch 60 records should be fetched. + // Those 50 records should be of the commit_time=001 because records with commit_time=000 have already been processed. + batch = runSource(Option.of(batch.getCheckpointForNextBatch()), 60); + rowDataset = batch.getBatch().get(); + assertEquals(60, rowDataset.count()); + assertEquals(60, rowDataset.where("commit_time=001").count()); + // No more records added, but sourceLimit is now set to 75. Still, only the remaining 40 records should be fetched. + rowDataset = runSource(Option.of(batch.getCheckpointForNextBatch()), 75).getBatch().get(); + assertEquals(40, rowDataset.count()); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testIncrementalFetchWhenLastCheckpointMoreThanTableRecords() { + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id"); + + try { + // Add 100 records with commit time "000" + clearAndInsert("000", 100, connection, DATA_GENERATOR, PROPS); + + // Start JdbcSource + InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 100); + Dataset<Row> rowDataset = batch.getBatch().get(); + assertEquals(100, rowDataset.count()); + assertEquals("100", batch.getCheckpointForNextBatch()); + + // Add 100 records with commit time "001" + insert("001", 100, connection, DATA_GENERATOR, PROPS); + + // Start incremental scan. With checkpoint greater than the number of records, there should not be any dataset to fetch. + batch = runSource(Option.of("200"), 50); + rowDataset = batch.getBatch().get(); + assertEquals(0, rowDataset.count()); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testIncrementalFetchFallbackToFullFetchWhenError() { + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + + try { + // Add 10 records with commit time "000" + clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS); + + // Start JdbcSource + InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 10); + Dataset<Row> rowDataset = batch.getBatch().get(); + assertEquals(10, rowDataset.count()); + + // Add 10 records with commit time "001" + insert("001", 10, connection, DATA_GENERATOR, PROPS); + + PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "dummy_col"); + assertThrows(HoodieException.class, () -> { + // Start incremental scan with a dummy column that does not exist. + // This will throw an exception as the default behavior is to not fallback to full fetch. + runSource(Option.of(batch.getCheckpointForNextBatch()), -1); + }); + + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch", "true"); + + // Start incremental scan with a dummy column that does not exist. + // This will fallback to full fetch mode but still throw an exception checkpointing will fail. + Exception exception = assertThrows(HoodieException.class, () -> { + runSource(Option.of(batch.getCheckpointForNextBatch()), -1); + }); + assertTrue(exception.getMessage().contains("Failed to checkpoint")); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testFullFetchWithCommitTime() { + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false"); + + try { + // Add 10 records with commit time "000" + clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS); + + // Start JdbcSource + Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get(); + assertEquals(10, rowDataset.count()); + // Add 10 records with commit time "001" + insert("001", 10, connection, DATA_GENERATOR, PROPS); + + // Start full fetch + rowDataset = runSource(Option.empty(), 20).getBatch().get(); + assertEquals(20, rowDataset.count()); + assertEquals(10, rowDataset.where("commit_time=000").count()); + assertEquals(10, rowDataset.where("commit_time=001").count()); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testFullFetchWithCheckpoint() { + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert"); + + try { + // Add 10 records with commit time "000" + clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS); + + // Start JdbcSource + InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 10); + Dataset<Row> rowDataset = batch.getBatch().get(); + assertEquals(10, rowDataset.count()); + assertEquals("", batch.getCheckpointForNextBatch()); + + // Get max of incremental column + Column incrementalColumn = rowDataset + .col(PROPS.getString("hoodie.deltastreamer.jdbc.table.incr.column.name")); + final String max = rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first() + .getString(0); + + // Add 10 records with commit time "001" + insert("001", 10, connection, DATA_GENERATOR, PROPS); + + // Start incremental scan + rowDataset = runSource(Option.of(max), 10).getBatch().get(); + assertEquals(10, rowDataset.count()); + assertEquals(10, rowDataset.where("commit_time=001").count()); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testSourceWithPasswordOnFs() { + try { + // Write secret string to fs in a file + writeSecretToFs(); + // Remove secret string from props + PROPS.remove("hoodie.deltastreamer.jdbc.password"); + // Set property to read secret from fs file + PROPS.setProperty("hoodie.deltastreamer.jdbc.password.file", "file:///tmp/hudi/config/secret"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false"); + // Add 10 records with commit time 000 + clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS); + Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get(); + assertEquals(10, rowDataset.count()); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testSourceWithNoPasswordThrowsException() { + assertThrows(HoodieException.class, () -> { + // Write secret string to fs in a file + writeSecretToFs(); + // Remove secret string from props + PROPS.remove("hoodie.deltastreamer.jdbc.password"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false"); + // Add 10 records with commit time 000 + clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS); + runSource(Option.empty(), 10); + }); + } + + @Test + public void testSourceWithExtraOptions() { + PROPS.setProperty("hoodie.deltastreamer.jdbc.extra.options.fetchsize", "10"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false"); + PROPS.remove("hoodie.deltastreamer.jdbc.table.incr.column.name"); + try { + // Add 20 records with commit time 000 + clearAndInsert("000", 20, connection, DATA_GENERATOR, PROPS); + Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get(); + assertEquals(10, rowDataset.count()); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testSourceWithStorageLevel() { + PROPS.setProperty("hoodie.deltastreamer.jdbc.storage.level", "NONE"); + PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false"); + try { + // Add 10 records with commit time 000 + clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS); + Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get(); + assertEquals(10, rowDataset.count()); + assertEquals(StorageLevel.NONE(), rowDataset.storageLevel()); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + private void writeSecretToFs() throws IOException { + FileSystem fs = FileSystem.get(new Configuration()); + FSDataOutputStream outputStream = fs.create(new Path("file:///tmp/hudi/config/secret")); + outputStream.writeBytes("jdbc"); + outputStream.close(); + } + + private InputBatch<Dataset<Row>> runSource(Option<String> lastCkptStr, long sourceLimit) { + Source<Dataset<Row>> jdbcSource = new JdbcSource(PROPS, jsc, sparkSession, null); + return jdbcSource.fetchNewData(lastCkptStr, sourceLimit); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/JdbcTestUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/JdbcTestUtils.java new file mode 100644 index 0000000..377063e --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/JdbcTestUtils.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.testutils; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; + +import org.apache.avro.generic.GenericRecord; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Objects; + +/** + * Helper class used in testing {@link org.apache.hudi.utilities.sources.JdbcSource}. + */ +public class JdbcTestUtils { + + private static final Logger LOG = LogManager.getLogger(JdbcTestUtils.class); + + public static List<HoodieRecord> clearAndInsert(String commitTime, int numRecords, Connection connection, HoodieTestDataGenerator dataGenerator, TypedProperties props) + throws SQLException { + execute(connection, "DROP TABLE triprec", "Table does not exists"); + execute(connection, "CREATE TABLE triprec (" + + "id INT NOT NULL AUTO_INCREMENT(1, 1)," + + "commit_time VARCHAR(50)," + + "_row_key VARCHAR(50)," + + "rider VARCHAR(50)," + + "driver VARCHAR(50)," + + "begin_lat DOUBLE PRECISION," + + "begin_lon DOUBLE PRECISION," + + "end_lat DOUBLE PRECISION," + + "end_lon DOUBLE PRECISION," + + "fare DOUBLE PRECISION," + + "last_insert TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP)", "Table already exists"); + + return insert(commitTime, numRecords, connection, dataGenerator, props); + } + + public static List<HoodieRecord> insert(String commitTime, int numRecords, Connection connection, HoodieTestDataGenerator dataGenerator, TypedProperties props) throws SQLException { + PreparedStatement insertStatement = + connection.prepareStatement("INSERT INTO triprec (" + + "commit_time," + + "_row_key," + + "rider," + + "driver," + + "begin_lat," + + "begin_lon," + + "end_lat," + + "end_lon," + + "fare) " + + "values(?,?,?,?,?,?,?,?,?)"); + List<HoodieRecord> hoodieRecords = dataGenerator.generateInserts(commitTime, numRecords); + + hoodieRecords + .stream() + .map(r -> { + try { + return ((GenericRecord) r.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA, props).get()); + } catch (IOException e) { + return null; + } + }) + .filter(Objects::nonNull) + .forEach(record -> { + try { + insertStatement.setString(1, commitTime); + insertStatement.setString(2, record.get("_row_key").toString()); + insertStatement.setString(3, record.get("rider").toString()); + insertStatement.setString(4, record.get("driver").toString()); + insertStatement.setDouble(5, Double.parseDouble(record.get("begin_lat").toString())); + insertStatement.setDouble(6, Double.parseDouble(record.get("begin_lon").toString())); + insertStatement.setDouble(7, Double.parseDouble(record.get("end_lat").toString())); + insertStatement.setDouble(8, Double.parseDouble(record.get("end_lon").toString())); + insertStatement.setDouble(9, Double.parseDouble(((GenericRecord) record.get("fare")).get("amount").toString())); + insertStatement.addBatch(); + } catch (SQLException e) { + LOG.warn(e.getMessage()); + } + }); + insertStatement.executeBatch(); + close(insertStatement); + return hoodieRecords; + } + + public static List<HoodieRecord> update(String commitTime, List<HoodieRecord> inserts, Connection connection, HoodieTestDataGenerator dataGenerator, TypedProperties props) + throws SQLException, IOException { + PreparedStatement updateStatement = + connection.prepareStatement("UPDATE triprec set commit_time=?," + + "_row_key=?," + + "rider=?," + + "driver=?," + + "begin_lat=?," + + "begin_lon=?," + + "end_lat=?," + + "end_lon=?," + + "fare=?" + + "where _row_key=?"); + + List<HoodieRecord> updateRecords = dataGenerator.generateUpdates(commitTime, inserts); + updateRecords.stream().map(m -> { + try { + return m.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA, props).get(); + } catch (IOException e) { + return null; + } + }).filter(Objects::nonNull) + .map(r -> ((GenericRecord) r)) + .sequential() + .forEach(r -> { + try { + updateStatement.setString(1, commitTime); + updateStatement.setString(2, r.get("_row_key").toString()); + updateStatement.setString(3, r.get("rider").toString()); + updateStatement.setString(4, r.get("driver").toString()); + updateStatement.setDouble(5, Double.parseDouble(r.get("begin_lat").toString())); + updateStatement.setDouble(6, Double.parseDouble(r.get("begin_lon").toString())); + updateStatement.setDouble(7, Double.parseDouble(r.get("end_lat").toString())); + updateStatement.setDouble(8, Double.parseDouble(r.get("end_lon").toString())); + updateStatement.setDouble(9, Double.parseDouble(((GenericRecord) r.get("fare")).get("amount").toString())); + updateStatement.setString(10, r.get("_row_key").toString()); + updateStatement.addBatch(); + } catch (SQLException e) { + LOG.warn(e.getMessage()); + } + }); + updateStatement.executeBatch(); + close(updateStatement); + return updateRecords; + } + + private static void execute(Connection connection, String query, String message) { + try (Statement statement = connection.createStatement()) { + statement.executeUpdate(query); + } catch (SQLException e) { + LOG.error(message); + } + } + + private static void close(Statement statement) { + try { + if (statement != null) { + statement.close(); + } + } catch (SQLException e) { + LOG.error("Error while closing statement. " + e.getMessage()); + } + } + + public static void close(Connection connection) { + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + LOG.error("Error while closing connection. " + e.getMessage()); + } + } + + public static int count(Connection connection, String tableName) { + try (Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery(String.format("select count(*) from %s", tableName)); + rs.next(); + return rs.getInt(1); + } catch (SQLException e) { + LOG.warn("Error while counting records. " + e.getMessage()); + return 0; + } + } +}