remove count operation

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/14c50ea3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/14c50ea3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/14c50ea3

Branch: refs/heads/master
Commit: 14c50ea34003334093ba73e958aaef142b20656b
Parents: a3227fd
Author: Chul Kang <[email protected]>
Authored: Tue Mar 27 21:28:40 2018 +0900
Committer: Chul Kang <[email protected]>
Committed: Thu Mar 29 12:03:29 2018 +0900

----------------------------------------------------------------------
 .../s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala   | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/14c50ea3/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala
index e705a7a..c41b231 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala
@@ -25,15 +25,14 @@ class S2SparkSqlStreamingSink(
   }
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
-    val dataCnt = data.count()
-    logger.debug(s"addBatch : $batchId, getLatest : ${writeLog.getLatest()}, 
data cnt : ${dataCnt}")
+    logger.debug(s"addBatch : $batchId, getLatest : ${writeLog.getLatest()}")
 
     if (batchId <= writeLog.getLatest().map(_._1).getOrElse(-1L)) {
       logger.info(s"Skipping already committed batch [$batchId]")
     } else {
       val queryName = getConfigStringOpt(config, 
"queryname").getOrElse(UUID.randomUUID().toString)
       val commitProtocol = new S2CommitProtocol(writeLog)
-      val jobState = JobState(queryName, batchId, dataCnt)
+      val jobState = JobState(queryName, batchId)
       val serializedConfig = 
config.root().render(ConfigRenderOptions.concise())
       val queryExecution = data.queryExecution
       val schema = data.schema

Reply via email to