[PIO-137] Create a connection object at a worker to delete events

This closes #446


Project: http://git-wip-us.apache.org/repos/asf/predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/predictionio/commit/ef6a4909
Tree: http://git-wip-us.apache.org/repos/asf/predictionio/tree/ef6a4909
Diff: http://git-wip-us.apache.org/repos/asf/predictionio/diff/ef6a4909

Branch: refs/heads/livedoc
Commit: ef6a4909cd39e42b29391c46befb21f66ee850c7
Parents: 161bc0e
Author: Shinsuke Sugaya <shins...@yahoo.co.jp>
Authored: Thu Mar 1 14:13:07 2018 -0800
Committer: Chan Lee <chanlee...@gmail.com>
Committed: Thu Mar 1 14:13:07 2018 -0800

----------------------------------------------------------------------
 LICENSE.txt                                       |  3 ++-
 build.sbt                                         |  2 +-
 storage/jdbc/build.sbt                            |  2 +-
 .../data/storage/jdbc/JDBCPEvents.scala           | 18 +++++++++++-------
 4 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/predictionio/blob/ef6a4909/LICENSE.txt
----------------------------------------------------------------------
diff --git a/LICENSE.txt b/LICENSE.txt
index f29befc..72f53a3 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -1359,9 +1359,9 @@ Binary distribution bundles
 
--------------------------------------------------------------------------------
 Binary distribution bundles
 
+  org.slf4j # slf4j-api # 1.7.25 (https://www.slf4j.org/)
   org.slf4j # slf4j-api # 1.7.18 (https://www.slf4j.org/)
   org.slf4j # slf4j-api # 1.7.16 (https://www.slf4j.org/)
-  org.slf4j # slf4j-api # 1.7.14 (https://www.slf4j.org/)
   org.slf4j # slf4j-api # 1.7.10 (https://www.slf4j.org/)
   org.slf4j # slf4j-api # 1.7.2 (https://www.slf4j.org/)
   org.slf4j # slf4j-log4j12 # 1.7.18 (https://www.slf4j.org/)
@@ -1709,6 +1709,7 @@ Binary distribution bundles
   org.scala-lang # scalap # 2.11.8 (http://scala-lang.org/)
   org.scala-lang.modules # scala-java8-compat_2.11 # 0.7.0 
(http://scala-lang.org/)
   org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.4 
(http://scala-lang.org/)
+  org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.6 
(http://scala-lang.org/)
   org.scala-lang.modules # scala-xml_2.11 # 1.0.3 (http://scala-lang.org/)
   org.scala-lang.modules # scala-xml_2.11 # 1.0.4 (http://scala-lang.org/)
   

http://git-wip-us.apache.org/repos/asf/predictionio/blob/ef6a4909/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index 7a81249..9776fda 100644
--- a/build.sbt
+++ b/build.sbt
@@ -111,7 +111,7 @@ val commonSettings = Seq(
 val commonTestSettings = Seq(
   libraryDependencies ++= Seq(
     "org.postgresql"   % "postgresql"  % "9.4-1204-jdbc41" % "test",
-    "org.scalikejdbc" %% "scalikejdbc" % "2.3.5" % "test"))
+    "org.scalikejdbc" %% "scalikejdbc" % "3.1.0" % "test"))
 
 val dataElasticsearch1 = (project in file("storage/elasticsearch1")).
   settings(commonSettings: _*).

http://git-wip-us.apache.org/repos/asf/predictionio/blob/ef6a4909/storage/jdbc/build.sbt
----------------------------------------------------------------------
diff --git a/storage/jdbc/build.sbt b/storage/jdbc/build.sbt
index c5bcb12..9026540 100644
--- a/storage/jdbc/build.sbt
+++ b/storage/jdbc/build.sbt
@@ -22,7 +22,7 @@ name := "apache-predictionio-data-jdbc"
 libraryDependencies ++= Seq(
   "org.apache.predictionio" %% "apache-predictionio-core" % version.value % 
"provided",
   "org.apache.spark"        %% "spark-sql"      % sparkVersion.value % 
"provided",
-  "org.scalikejdbc"         %% "scalikejdbc"    % "2.3.5",
+  "org.scalikejdbc"         %% "scalikejdbc"    % "3.1.0",
   "org.postgresql"           % "postgresql"     % "9.4-1204-jdbc41" % "test",
   "org.specs2"              %% "specs2"         % "2.3.13" % "test")
 

http://git-wip-us.apache.org/repos/asf/predictionio/blob/ef6a4909/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
----------------------------------------------------------------------
diff --git 
a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
 
b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
index 487ae2f..d31e592 100644
--- 
a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
+++ 
b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCPEvents.scala
@@ -171,15 +171,19 @@ class JDBCPEvents(client: String, config: 
StorageClientConfig, namespace: String
   def delete(eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: 
SparkContext): Unit = {
 
     eventIds.foreachPartition{ iter =>
-
-      iter.foreach { eventId =>
-        DB localTx { implicit session =>
+      DB(
+        DriverManager.getConnection(
+          client,
+          config.properties("USERNAME"),
+          config.properties("PASSWORD"))
+      ) localTx { implicit session =>
         val tableName = JDBCUtils.eventTableName(namespace, appId, channelId)
         val table = SQLSyntax.createUnsafely(tableName)
-        sql"""
-        delete from $table where id = $eventId
-        """.update().apply()
-        true
+
+        iter.foreach { eventId =>
+          sql"""
+          delete from $table where id = $eventId
+          """.update().apply()
         }
       }
     }

Reply via email to