This is an automated email from the ASF dual-hosted git repository.
wanghe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 9f3197cb JDBC Connector SQL mode (#628)
9f3197cb is described below
commit 9f3197cb2869f5d15a2d3b965650e22ce254f3aa
Author: Yu Xiaoyu <[email protected]>
AuthorDate: Tue May 9 10:05:30 2023 +0800
JDBC Connector SQL mode (#628)
* jdbc sql mode
* add ut
* add description for jdbc connector config
---
griffin-doc/measure/measure-configuration-guide.md | 18 +++++++++++++++++-
.../connector/batch/JDBCBasedDataConnector.scala | 7 ++++++-
.../connector/batch/JDBCBasedDataConnectorTest.scala | 14 +++++++++++++-
3 files changed, 36 insertions(+), 3 deletions(-)
diff --git a/griffin-doc/measure/measure-configuration-guide.md
b/griffin-doc/measure/measure-configuration-guide.md
index 300a6992..bed389fa 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -553,8 +553,10 @@ List of supported data connectors:
| password | `String` | password for connection to database | `null` |
| driver | `String` | driver class for JDBC connection to database |
com.mysql.jdbc.Driver |
| where | `String` | condition for reading data from table | `Empty` |
+| sql | `String` | query sql | `Empty` |
-- Example:
+
+- Example (without `sql` provided):
```
"connector": {
"type": "jdbc",
@@ -568,6 +570,20 @@ List of supported data connectors:
"where": ""
}
}
+
+- If the config `sql` was provided, `database`, `tablename` and `where` can
all be ignored, connector will extract the result of sql.
+- Example (with `sql` provided):
+ ```
+ "connector": {
+ "type": "jdbc",
+ "config": {
+ "sql": "select col_a, col_b from griffin.griffin where id > 100 limit
100",
+ "url": "jdbc:mysql://localhost:3306/default",
+ "user": "test_u",
+ "password": "test_p",
+ "driver": "com.mysql.jdbc.Driver",
+ }
+ }
**Note:** Jar containing driver class should be present in Spark job's class
path, by either providing custom jar with
`--jars` parameter to spark-submit or by adding setting `spark.jars` in `spark
-> config` section of environment config.
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
index 2f77db3c..688cbdb9 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
@@ -55,6 +55,7 @@ case class JDBCBasedDataConnector(
val config: Map[String, Any] = dcParam.getConfig
val database: String = config.getString(Database, DefaultDatabase)
val tableName: String = config.getString(TableName, EmptyString)
+ val sql: String = config.getString(Sql, EmptyString)
val fullTableName: String = s"$database.$tableName"
val whereString: String = config.getString(Where, EmptyString)
val url: String = config.getString(Url, EmptyString)
@@ -65,7 +66,7 @@ case class JDBCBasedDataConnector(
require(url.nonEmpty, "JDBC connection: connection url is mandatory")
require(user.nonEmpty, "JDBC connection: user name is mandatory")
require(password.nonEmpty, "JDBC connection: password is mandatory")
- require(tableName.nonEmpty, "JDBC connection: table is mandatory")
+ require(sql.nonEmpty || tableName.nonEmpty, "JDBC connection: sql or table
is mandatory")
assert(isJDBCDriverLoaded(driver), s"JDBC driver $driver not present in
classpath")
override def data(ms: Long): (Option[DataFrame], TimeRange) = {
@@ -99,6 +100,9 @@ case class JDBCBasedDataConnector(
* @return Return SQL statement with where condition if provided
*/
private def createSqlStmt(): String = {
+ if (sql.nonEmpty) {
+ return sql
+ }
val tableClause = s"SELECT * FROM $fullTableName"
if (whereString.nonEmpty) {
s"$tableClause WHERE $whereString"
@@ -109,6 +113,7 @@ case class JDBCBasedDataConnector(
object JDBCBasedDataConnector extends Loggable {
private val Database: String = "database"
private val TableName: String = "tablename"
+ private val Sql: String = "sql"
private val Where: String = "where"
private val Url: String = "url"
private val User: String = "user"
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
index acfac72f..8f455de9 100644
---
a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
+++
b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
@@ -105,7 +105,19 @@ class JDBCBasedDataConnectorTest extends SparkSuiteBase
with Matchers {
"password" -> "password",
"driver" -> "org.h2.Driver")
JDBCBasedDataConnector(spark, dcParam.copy(config = configs),
timestampStorage)
- } should have message "requirement failed: JDBC connection: table is
mandatory"
+ } should have message "requirement failed: JDBC connection: sql or table
is mandatory"
+ }
+
+ "JDBC data connector" should "have user name field in config" in {
+ the[java.lang.IllegalArgumentException] thrownBy {
+ val configs = Map(
+ "database" -> "griffin",
+ "url" -> url,
+ "sql" -> "select col_a, col_b, col_c from griffin.griffin limit 10000",
+ "password" -> "password",
+ "driver" -> "org.h2.Driver")
+ JDBCBasedDataConnector(spark, dcParam.copy(config = configs),
timestampStorage)
+ } should have message "requirement failed: JDBC connection: user name is
mandatory"
}
"JDBC data connector" should "have user name field in config" in {