This is an automated email from the ASF dual-hosted git repository.
wankun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new ee82cbd [GRIFFIN-315] Adding JDBC based data connector
ee82cbd is described below
commit ee82cbd989fcbb73e9043595f6e0dc3e03aa992f
Author: tusharpatil20 <[email protected]>
AuthorDate: Tue Dec 31 10:28:00 2019 +0800
[GRIFFIN-315] Adding JDBC based data connector
**What changes were proposed in this pull request?**
JDBC based data connector to read data from different JDBC based data
sources.
**Does this PR introduce any user-facing change?**
No
**How was this patch tested?**
Griffin test suite.
Author: tusharpatil20 <[email protected]>
Closes #561 from tusharpatil20/JDBCBased-source-connector.
---
measure/pom.xml | 6 +
.../connector/DataConnectorFactory.scala | 2 +
.../connector/batch/JDBCBasedDataConnector.scala | 127 +++++++++++++++++
.../connector/batch/MySqlDataConnector.scala | 3 +
.../batch/JDBCBasedDataConnectorTest.scala | 151 +++++++++++++++++++++
5 files changed, 289 insertions(+)
diff --git a/measure/pom.xml b/measure/pom.xml
index ce198bb..be98435 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -201,6 +201,12 @@ under the License.
<artifactId>httpclient</artifactId>
<version>4.5.9</version>
</dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <version>1.4.200</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index 2578470..deb9bd8 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -40,6 +40,7 @@ object DataConnectorFactory extends Loggable {
val KafkaRegex: Regex = """^(?i)kafka$""".r
val CustomRegex: Regex = """^(?i)custom$""".r
+ val JDBCRegex: Regex = """^(?i)jdbc$""".r
/**
* create data connector
@@ -72,6 +73,7 @@ object DataConnectorFactory extends Loggable {
dcParam,
tmstCache,
streamingCacheClientOpt)
+ case JDBCRegex() => JDBCBasedDataConnector(sparkSession, dcParam,
tmstCache)
case _ => throw new Exception("connector creation error!")
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
new file mode 100644
index 0000000..e7efa6e
--- /dev/null
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+ * A batch data connector for JDBC based source which allows support for
various
+ * JDBC based data sources like Oracle. Postgres etc.
+ *
+ * Supported Configurations:
+ * - database : [[String]] specifying the database name.
+ * - tablename : [[String]] specifying the table name to be read
+ * - url : [[String]] specifying the URL to connect to database
+ * - user : [[String]] specifying the user for connection to database
+ * - password: [[String]] specifying the password for connection to database
+ * - driver : [[String]] specifying the driver for JDBC connection to database
+ * - where : [[String]] specifying the condition for reading data from table
+ *
+ * Some defaults assumed by this connector (if not set) are as follows:
+ * - `database` is default,
+ * - `driver` is com.mysql.jdbc.Driver,
+ * - `where` is None
+ */
+case class JDBCBasedDataConnector(
+ @transient sparkSession: SparkSession,
+ dcParam: DataConnectorParam,
+ timestampStorage: TimestampStorage)
+ extends BatchDataConnector {
+
+ import JDBCBasedDataConnector._
+ val config: Map[String, Any] = dcParam.getConfig
+ val database: String = config.getString(Database, DefaultDatabase)
+ val tableName: String = config.getString(TableName, EmptyString)
+ val fullTableName: String = s"$database.$tableName"
+ val whereString: String = config.getString(Where, EmptyString)
+ val url: String = config.getString(Url, EmptyString)
+ val user: String = config.getString(User, EmptyString)
+ val password: String = config.getString(Password, EmptyString)
+ val driver: String = config.getString(Driver, DefaultDriver)
+
+ require(url.nonEmpty, "JDBC connection: connection url is mandatory")
+ require(user.nonEmpty, "JDBC connection: user name is mandatory")
+ require(password.nonEmpty, "JDBC connection: password is mandatory")
+ require(tableName.nonEmpty, "JDBC connection: table is mandatory")
+ assert(isJDBCDriverLoaded(driver), s"JDBC driver ${driver} not present in
classpath")
+
+ override def data(ms: Long): (Option[DataFrame], TimeRange) = {
+ val dfOpt = try {
+ val dtSql = createSqlStmt()
+ val prop = new java.util.Properties
+ prop.setProperty("user", user)
+ prop.setProperty("password", password)
+ prop.setProperty("driver", driver)
+ val df: DataFrame = sparkSession.read.jdbc(url, s"($dtSql) as t", prop)
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable =>
+ error(s"loading table $fullTableName fails: ${e.getMessage}", e)
+ None
+ }
+ val tmsts = readTmst(ms)
+ (dfOpt, TimeRange(ms, tmsts))
+ }
+
+ /**
+ * @return Return SQL statement with where condition if provided
+ */
+ private def createSqlStmt(): String = {
+ val tableClause = s"SELECT * FROM $fullTableName"
+ if (whereString.length > 0) {
+ s"$tableClause WHERE $whereString"
+ } else tableClause
+ }
+}
+
+object JDBCBasedDataConnector extends Loggable {
+ private val Database: String = "database"
+ private val TableName: String = "tablename"
+ private val Where: String = "where"
+ private val Url: String = "url"
+ private val User: String = "user"
+ private val Password: String = "password"
+ private val Driver: String = "driver"
+
+ private val DefaultDriver = "com.mysql.jdbc.Driver"
+ private val DefaultDatabase = "default"
+ private val EmptyString = ""
+
+ /**
+ * @param driver JDBC driver class name
+ * @return True if JDBC driver present in classpath
+ */
+ private def isJDBCDriverLoaded(driver: String): Boolean = {
+ try {
+ Class.forName(driver, false, this.getClass.getClassLoader)
+ true
+ } catch {
+ case x: ClassNotFoundException =>
+ griffinLogger.error(s"JDBC driver ${driver} provided is not found in
class path")
+ false
+ }
+ }
+}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
index 05d5d87..31502b2 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
@@ -24,6 +24,9 @@ import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.ParamUtil._
+@deprecated(
+ "This class is deprecated. Use
'org.apache.griffin.measure.datasource.connector.batch.JDBCBasedDataConnector'",
+ "0.6.0")
case class MySqlDataConnector(
@transient sparkSession: SparkSession,
dcParam: DataConnectorParam,
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
new file mode 100644
index 0000000..e707ef7
--- /dev/null
+++
b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.griffin.measure.datasource.connector.batch
+
+import java.sql.DriverManager
+import java.util.Properties
+
+import org.scalatest.Matchers
+
+import org.apache.griffin.measure.SparkSuiteBase
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+
+class JDBCBasedDataConnectorTest extends SparkSuiteBase with Matchers {
+
+ val url = "jdbc:h2:mem:test"
+ var conn: java.sql.Connection = null
+ val properties = new Properties()
+ properties.setProperty("user", "user")
+ properties.setProperty("password", "password")
+ properties.setProperty("rowId", "false")
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ Class.forName("org.h2.Driver", false, this.getClass.getClassLoader)
+ conn = DriverManager.getConnection(url, properties)
+ conn.prepareStatement("create schema griffin").executeUpdate()
+
+ conn.prepareStatement("drop table if exists
griffin.employee").executeUpdate()
+ conn
+ .prepareStatement(
+ "create table griffin.employee (name VARCHAR NOT NULL, id INTEGER NOT
NULL)")
+ .executeUpdate()
+ conn.prepareStatement("insert into griffin.employee values ('emp1',
1)").executeUpdate()
+ conn.prepareStatement("insert into griffin.employee values ('emp2',
2)").executeUpdate()
+ conn.commit()
+ }
+
+ private final val dcParam =
+ DataConnectorParam("jdbc", "1", "test_df", Map.empty[String, String], Nil)
+ private final val timestampStorage = TimestampStorage()
+
+ "JDBC based data connector" should "be able to read data from relational
database" in {
+ val configs = Map(
+ "database" -> "griffin",
+ "tablename" -> "employee",
+ "url" -> url,
+ "user" -> "user",
+ "password" -> "password",
+ "driver" -> "org.h2.Driver")
+ val dc = JDBCBasedDataConnector(spark, dcParam.copy(config = configs),
timestampStorage)
+ val result = dc.data(1000L)
+ assert(result._1.isDefined)
+ assert(result._1.get.collect().length == 2)
+ }
+
+ "JDBC based data connector" should "be able to read data from relational
database with where condition" in {
+ val configs = Map(
+ "database" -> "griffin",
+ "tablename" -> "employee",
+ "url" -> url,
+ "user" -> "user",
+ "password" -> "password",
+ "driver" -> "org.h2.Driver",
+ "where" -> "id=1 and name='emp1'")
+ val dc = JDBCBasedDataConnector(spark, dcParam.copy(config = configs),
timestampStorage)
+ val result = dc.data(1000L)
+ assert(result._1.isDefined)
+ assert(result._1.get.collect().length == 1)
+ }
+
+ "JDBC data connector" should "have URL field in config" in {
+ the[java.lang.IllegalArgumentException] thrownBy {
+ val configs = Map(
+ "database" -> "griffin",
+ "tablename" -> "employee",
+ "user" -> "user",
+ "password" -> "password",
+ "driver" -> "org.h2.Driver")
+ JDBCBasedDataConnector(spark, dcParam.copy(config = configs),
timestampStorage)
+ } should have message "requirement failed: JDBC connection: connection url
is mandatory"
+ }
+
+ "JDBC data connector" should "have table name field in config" in {
+ the[java.lang.IllegalArgumentException] thrownBy {
+ val configs = Map(
+ "database" -> "griffin",
+ "url" -> url,
+ "user" -> "user",
+ "password" -> "password",
+ "driver" -> "org.h2.Driver")
+ JDBCBasedDataConnector(spark, dcParam.copy(config = configs),
timestampStorage)
+ } should have message "requirement failed: JDBC connection: table is
mandatory"
+ }
+
+ "JDBC data connector" should "have user name field in config" in {
+ the[java.lang.IllegalArgumentException] thrownBy {
+ val configs = Map(
+ "database" -> "griffin",
+ "url" -> url,
+ "tablename" -> "employee",
+ "password" -> "password",
+ "driver" -> "org.h2.Driver")
+ JDBCBasedDataConnector(spark, dcParam.copy(config = configs),
timestampStorage)
+ } should have message "requirement failed: JDBC connection: user name is
mandatory"
+ }
+
+ "JDBC data connector" should "have table password field in config" in {
+ the[java.lang.IllegalArgumentException] thrownBy {
+ val configs = Map(
+ "database" -> "griffin",
+ "url" -> url,
+ "tablename" -> "employee",
+ "user" -> "user",
+ "driver" -> "org.h2.Driver")
+ JDBCBasedDataConnector(spark, dcParam.copy(config = configs),
timestampStorage)
+ } should have message "requirement failed: JDBC connection: password is
mandatory"
+ }
+
+ "JDBC data connector" should "have driver provided in config in classpath"
in {
+ the[AssertionError] thrownBy {
+ val configs = Map(
+ "database" -> "griffin",
+ "url" -> url,
+ "tablename" -> "employee",
+ "user" -> "user",
+ "password" -> "password",
+ "driver" -> "org.postgresql.Driver")
+ JDBCBasedDataConnector(spark, dcParam.copy(config = configs),
timestampStorage)
+ }
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ conn.close()
+ }
+}