[PIO-137] Create a connection object at a worker to delete events This closes #446
Project: http://git-wip-us.apache.org/repos/asf/predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/predictionio/commit/ef6a4909 Tree: http://git-wip-us.apache.org/repos/asf/predictionio/tree/ef6a4909 Diff: http://git-wip-us.apache.org/repos/asf/predictionio/diff/ef6a4909 Branch: refs/heads/livedoc Commit: ef6a4909cd39e42b29391c46befb21f66ee850c7 Parents: 161bc0e Author: Shinsuke Sugaya <[email protected]> Authored: Thu Mar 1 14:13:07 2018 -0800 Committer: Chan Lee <[email protected]> Committed: Thu Mar 1 14:13:07 2018 -0800 ---------------------------------------------------------------------- LICENSE.txt | 3 ++- build.sbt | 2 +- storage/jdbc/build.sbt | 2 +- .../data/storage/jdbc/JDBCPEvents.scala | 18 +++++++++++------- 4 files changed, 15 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/predictionio/blob/ef6a4909/LICENSE.txt ---------------------------------------------------------------------- diff --git a/LICENSE.txt b/LICENSE.txt index f29befc..72f53a3 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1359,9 +1359,9 @@ Binary distribution bundles -------------------------------------------------------------------------------- Binary distribution bundles + org.slf4j # slf4j-api # 1.7.25 (https://www.slf4j.org/) org.slf4j # slf4j-api # 1.7.18 (https://www.slf4j.org/) org.slf4j # slf4j-api # 1.7.16 (https://www.slf4j.org/) - org.slf4j # slf4j-api # 1.7.14 (https://www.slf4j.org/) org.slf4j # slf4j-api # 1.7.10 (https://www.slf4j.org/) org.slf4j # slf4j-api # 1.7.2 (https://www.slf4j.org/) org.slf4j # slf4j-log4j12 # 1.7.18 (https://www.slf4j.org/) @@ -1709,6 +1709,7 @@ Binary distribution bundles org.scala-lang # scalap # 2.11.8 (http://scala-lang.org/) org.scala-lang.modules # scala-java8-compat_2.11 # 0.7.0 (http://scala-lang.org/) org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.4 (http://scala-lang.org/) + org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.6 (http://scala-lang.org/) org.scala-lang.modules # scala-xml_2.11 # 1.0.3 (http://scala-lang.org/) org.scala-lang.modules # scala-xml_2.11 # 1.0.4 (http://scala-lang.org/) http://git-wip-us.apache.org/repos/asf/predictionio/blob/ef6a4909/build.sbt ---------------------------------------------------------------------- diff --git a/build.sbt b/build.sbt index 7a81249..9776fda 100644 --- a/build.sbt +++ b/build.sbt @@ -111,7 +111,7 @@ val commonSettings = Seq( val commonTestSettings = Seq( libraryDependencies ++= Seq( "org.postgresql" % "postgresql" % "9.4-1204-jdbc41" % "test", - "org.scalikejdbc" %% "scalikejdbc" % "2.3.5" % "test")) + "org.scalikejdbc" %% "scalikejdbc" % "3.1.0" % "test")) val dataElasticsearch1 = (project in file("storage/elasticsearch1")). settings(commonSettings: _*). http://git-wip-us.apache.org/repos/asf/predictionio/blob/ef6a4909/storage/jdbc/build.sbt ---------------------------------------------------------------------- diff --git a/storage/jdbc/build.sbt b/storage/jdbc/build.sbt index c5bcb12..9026540 100644 --- a/storage/jdbc/build.sbt +++ b/storage/jdbc/build.sbt @@ -22,7 +22,7 @@ name := "apache-predictionio-data-jdbc" libraryDependencies ++= Seq( "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", - "org.scalikejdbc" %% "scalikejdbc" % "2.3.5", + "org.scalikejdbc" %% "scalikejdbc" % "3.1.0", "org.postgresql" % "postgresql" % "9.4-1204-jdbc41" % "test", "org.specs2" %% "specs2" % "2.3.13" % "test") http://git-wip-us.apache.org/repos/asf/predictionio/blob/ef6a4909/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala index 487ae2f..d31e592 100644 --- a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala @@ -171,15 +171,19 @@ class JDBCPEvents(client: String, config: StorageClientConfig, namespace: String def delete(eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { eventIds.foreachPartition{ iter => - - iter.foreach { eventId => - DB localTx { implicit session => + DB( + DriverManager.getConnection( + client, + config.properties("USERNAME"), + config.properties("PASSWORD")) + ) localTx { implicit session => val tableName = JDBCUtils.eventTableName(namespace, appId, channelId) val table = SQLSyntax.createUnsafely(tableName) - sql""" - delete from $table where id = $eventId - """.update().apply() - true + + iter.foreach { eventId => + sql""" + delete from $table where id = $eventId + """.update().apply() } } }
