pass streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/365a85d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/365a85d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/365a85d1 Branch: refs/heads/griffin-0.2.0-incubating-rc4 Commit: 365a85d14028dcf0d5f2e77f3e152e8dca75a504 Parents: 6dd65d3 Author: Lionel Liu <[email protected]> Authored: Tue Apr 17 15:44:49 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Tue Apr 17 15:44:49 2018 +0800 ---------------------------------------------------------------------- .../griffin/measure/process/engine/DataFrameOprEngine.scala | 4 +--- .../scala/org/apache/griffin/measure/utils/HdfsUtil.scala | 2 +- .../src/test/resources/_accuracy-streaming-griffindsl.json | 8 +++++--- .../test/resources/_completeness-streaming-griffindsl.json | 7 ++++--- .../src/test/resources/_profiling-streaming-griffindsl.json | 7 ++++--- measure/src/test/resources/env-streaming.json | 1 + 6 files changed, 16 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala index 600da45..c06406c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala @@ -120,11 +120,9 @@ object DataFrameOprs { } } - implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.bean(classOf[AccuracyResult])) - val df = sqlContext.table(s"`${dfName}`") - val results = df.flatMap { row => + val results = df.rdd.flatMap { row => try { val tmst = getLong(row, InternalColumns.tmst).getOrElse(timeInfo.calcTime) val missCount = getLong(row, miss).getOrElse(0L) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala index aa5643b..0a91fab 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala @@ -28,7 +28,7 @@ object HdfsUtil extends Loggable { private val conf = new Configuration() conf.setBoolean("dfs.support.append", true) -// conf.set("fs.defaultFS", "hdfs://localhost") // debug @localhost + conf.set("fs.defaultFS", "hdfs://localhost") // debug @localhost private val dfs = FileSystem.get(conf) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/test/resources/_accuracy-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json b/measure/src/test/resources/_accuracy-streaming-griffindsl.json index a0e2e7d..240d768 100644 --- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json +++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json @@ -13,7 +13,7 @@ "version": "0.8", "config": { "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", + "bootstrap.servers": "10.147.177.107:9092", "group.id": "group1", "auto.offset.reset": "smallest", "auto.commit.enable": "false" @@ -46,6 +46,7 @@ "ready.time.interval": "10s", "ready.time.delay": "0", "time.range": ["-2m", "0"], + "init.clear": true, "updatable": true } }, { @@ -56,7 +57,7 @@ "version": "0.8", "config": { "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", + "bootstrap.servers": "10.147.177.107:9092", "group.id": "group1", "auto.offset.reset": "smallest", "auto.commit.enable": "false" @@ -88,7 +89,8 @@ "info.path": "target", "ready.time.interval": "10s", "ready.time.delay": "0", - "time.range": ["-2m", "0"] + "time.range": ["-2m", "0"], + "init.clear": true } } ], http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/test/resources/_completeness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json index df1b889..ba8bdce 100644 --- a/measure/src/test/resources/_completeness-streaming-griffindsl.json +++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json @@ -39,11 +39,12 @@ } ], "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/old", - "info.path": "old", + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", "ready.time.interval": "10s", "ready.time.delay": "0", - "time.range": ["0", "0"] + "time.range": ["0", "0"], + "init.clear": true } } ], http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/test/resources/_profiling-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json b/measure/src/test/resources/_profiling-streaming-griffindsl.json index e662897..b6feb5a 100644 --- a/measure/src/test/resources/_profiling-streaming-griffindsl.json +++ b/measure/src/test/resources/_profiling-streaming-griffindsl.json @@ -12,12 +12,12 @@ "version": "0.8", "config": { "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", + "bootstrap.servers": "10.147.177.107:9092", "group.id": "group1", "auto.offset.reset": "smallest", "auto.commit.enable": "false" }, - "topics": "sss", + "topics": "test", "key.type": "java.lang.String", "value.type": "java.lang.String" }, @@ -43,7 +43,8 @@ "info.path": "source", "ready.time.interval": "10s", "ready.time.delay": "0", - "time.range": ["0", "0"] + "time.range": ["0", "0"], + "init.clear": true } } ], http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/test/resources/env-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env-streaming.json b/measure/src/test/resources/env-streaming.json index a01348f..08dd7ee 100644 --- a/measure/src/test/resources/env-streaming.json +++ b/measure/src/test/resources/env-streaming.json @@ -4,6 +4,7 @@ "checkpoint.dir": "hdfs://localhost/test/griffin/cp", "batch.interval": "2s", "process.interval": "10s", + "init.clear": true, "config": { "spark.master": "local[*]", "spark.task.maxFailures": 5,
