remove count operation
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/14c50ea3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/14c50ea3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/14c50ea3 Branch: refs/heads/master Commit: 14c50ea34003334093ba73e958aaef142b20656b Parents: a3227fd Author: Chul Kang <[email protected]> Authored: Tue Mar 27 21:28:40 2018 +0900 Committer: Chul Kang <[email protected]> Committed: Thu Mar 29 12:03:29 2018 +0900 ---------------------------------------------------------------------- .../s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/14c50ea3/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala index e705a7a..c41b231 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala @@ -25,15 +25,14 @@ class S2SparkSqlStreamingSink( } override def addBatch(batchId: Long, data: DataFrame): Unit = { - val dataCnt = data.count() - logger.debug(s"addBatch : $batchId, getLatest : ${writeLog.getLatest()}, data cnt : ${dataCnt}") + logger.debug(s"addBatch : $batchId, getLatest : ${writeLog.getLatest()}") if (batchId <= writeLog.getLatest().map(_._1).getOrElse(-1L)) { logger.info(s"Skipping already committed batch [$batchId]") } else { val queryName = getConfigStringOpt(config, "queryname").getOrElse(UUID.randomUUID().toString) val commitProtocol = new S2CommitProtocol(writeLog) - val jobState = JobState(queryName, batchId, dataCnt) + val jobState = JobState(queryName, batchId) val serializedConfig = config.root().render(ConfigRenderOptions.concise()) val queryExecution = data.queryExecution val schema = data.schema
