Repository: incubator-griffin Updated Branches: refs/heads/master 615f57dea -> a2ea0e84e
fix bugs of measure and service module for new json format Author: Lionel Liu <[email protected]> Closes #400 from bhlx3lyx7/json-update-new. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/a2ea0e84 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/a2ea0e84 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/a2ea0e84 Branch: refs/heads/master Commit: a2ea0e84e94c584b12f2bed2a05a9d33c5c453bc Parents: 615f57d Author: Lionel Liu <[email protected]> Authored: Fri Aug 24 15:17:55 2018 +0800 Committer: William Guo <[email protected]> Committed: Fri Aug 24 15:17:55 2018 +0800 ---------------------------------------------------------------------- .../src/main/resources/config-streaming.json | 3 +- .../datasource/cache/StreamingCacheClient.scala | 14 +++++--- .../_accuracy-streaming-griffindsl.json | 4 +-- .../_completeness-streaming-griffindsl.json | 3 +- .../_profiling-streaming-griffindsl.json | 3 +- .../src/test/resources/env-streaming-mongo.json | 1 + .../org/apache/griffin/core/util/FSUtil.java | 37 +++++++++++++++----- .../src/main/resources/application.properties | 2 ++ .../src/main/resources/env/env_streaming.json | 2 +- .../src/test/resources/application.properties | 2 ++ 10 files changed, 48 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/measure/src/main/resources/config-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/config-streaming.json b/measure/src/main/resources/config-streaming.json index a7c47e8..1be7a0a 100644 --- a/measure/src/main/resources/config-streaming.json +++ b/measure/src/main/resources/config-streaming.json @@ -44,8 +44,7 @@ "info.path": "source", "ready.time.interval": "10s", "ready.time.delay": "0", - "time.range": ["0", "0"], - "init.clear": true + "time.range": ["0", "0"] } } ], http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala index 0ebe6ba..b351f82 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala @@ -71,10 +71,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L) val deltaTimeRange: (Long, Long) = { def negative(n: Long): Long = if (n <= 0) n else 0 - case class StringSeq(values:Seq[String]) param.get(_TimeRange) match { - case Some(seq: StringSeq) => { - val nseq = seq.values.flatMap(TimeUtil.milliseconds(_)) + case Some(seq: Seq[String]) => { + val nseq = seq.flatMap(TimeUtil.milliseconds(_)) val ns = negative(nseq.headOption.getOrElse(0)) val ne = negative(nseq.tail.headOption.getOrElse(0)) (ns, ne) @@ -100,6 +99,11 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame + private def readDataFrameOpt(dfr: DataFrameReader, path: String): Option[DataFrame] = { + val df = readDataFrame(dfr, path) + if (df.count() > 0) Some(df) else None + } + /** * save data frame in dump phase * @param dfOpt data frame to be saved @@ -170,7 +174,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] // new cache data val newDfOpt = try { val dfr = sqlContext.read - Some(readDataFrame(dfr, newFilePath).filter(filterStr)) + readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr)) } catch { case e: Throwable => { warn(s"read data source cache warn: ${e.getMessage}") @@ -184,7 +188,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] val oldDfPath = s"${oldFilePath}/${idx}" try { val dfr = sqlContext.read - Some(readDataFrame(dfr, oldDfPath).filter(filterStr)) + readDataFrameOpt(dfr, oldDfPath).map(_.filter(filterStr)) } catch { case e: Throwable => { warn(s"read old data source cache warn: ${e.getMessage}") http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/measure/src/test/resources/_accuracy-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json b/measure/src/test/resources/_accuracy-streaming-griffindsl.json index 4492a35..9ce25df 100644 --- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json +++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json @@ -45,7 +45,6 @@ "ready.time.interval": "10s", "ready.time.delay": "0", "time.range": ["-2m", "0"], - "init.clear": true, "updatable": true } }, { @@ -87,8 +86,7 @@ "info.path": "target", "ready.time.interval": "10s", "ready.time.delay": "0", - "time.range": ["-2m", "0"], - "init.clear": true + "time.range": ["-2m", "0"] } } ], http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/measure/src/test/resources/_completeness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json index 02f0a39..53e1e7b 100644 --- a/measure/src/test/resources/_completeness-streaming-griffindsl.json +++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json @@ -42,8 +42,7 @@ "info.path": "source", "ready.time.interval": "10s", "ready.time.delay": "0", - "time.range": ["0", "0"], - "init.clear": true + "time.range": ["0", "0"] } } ], http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/measure/src/test/resources/_profiling-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json b/measure/src/test/resources/_profiling-streaming-griffindsl.json index a523434..1d28745 100644 --- a/measure/src/test/resources/_profiling-streaming-griffindsl.json +++ b/measure/src/test/resources/_profiling-streaming-griffindsl.json @@ -42,8 +42,7 @@ "info.path": "source", "ready.time.interval": "10s", "ready.time.delay": "0", - "time.range": ["0", "0"], - "init.clear": true + "time.range": ["0", "0"] } } ], http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/measure/src/test/resources/env-streaming-mongo.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env-streaming-mongo.json b/measure/src/test/resources/env-streaming-mongo.json index ef10aef..b06d611 100644 --- a/measure/src/test/resources/env-streaming-mongo.json +++ b/measure/src/test/resources/env-streaming-mongo.json @@ -4,6 +4,7 @@ "checkpoint.dir": "hdfs://localhost/test/griffin/cp", "batch.interval": "2s", "process.interval": "10s", + "init.clear": true, "config": { "spark.master": "local[*]", "spark.task.maxFailures": 5, http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/service/src/main/java/org/apache/griffin/core/util/FSUtil.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java index 71f55d2..b537ac6 100644 --- a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java +++ b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java @@ -40,14 +40,31 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +@Component public class FSUtil { private static final Logger LOGGER = LoggerFactory.getLogger(FSUtil.class); private static final int SAMPLE_ROW_COUNT = 100; + private static String fsDefaultName; + private static FileSystem fileSystem; + private static FileSystem defaultFS = getDefaultFileSystem(); + private static FileSystem getDefaultFileSystem() { + FileSystem fs = null; + Configuration conf = new Configuration(); + try { + fs = FileSystem.get(conf); + } catch (Exception e) { + LOGGER.error("Can not get default hdfs file system. {}", e); + } + return fs; + } + private static FileSystem getFileSystem() { if (fileSystem == null) { initFileSystem(); @@ -55,26 +72,30 @@ public class FSUtil { return fileSystem; } + public FSUtil(@Value("${fs.defaultFS}") String defaultName) { + fsDefaultName = defaultName; + } private static void initFileSystem() { Configuration conf = new Configuration(); + if (!StringUtils.isEmpty(fsDefaultName)) { + conf.set("fs.defaultFS", fsDefaultName); + LOGGER.info("Setting fs.defaultFS:{}", fsDefaultName); + } if (StringUtils.isEmpty(conf.get("fs.hdfs.impl"))) { - LOGGER.info("Setting fs.hdfs.impl:{}", org.apache.hadoop.hdfs - .DistributedFileSystem.class.getName()); - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs - .DistributedFileSystem.class.getName()); + LOGGER.info("Setting fs.hdfs.impl:{}", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); } if (StringUtils.isEmpty(conf.get("fs.file.impl"))) { - LOGGER.info("Setting fs.file.impl:{}", org.apache.hadoop.fs - .LocalFileSystem.class.getName()); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class - .getName()); + LOGGER.info("Setting fs.file.impl:{}", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); } try { fileSystem = FileSystem.get(conf); } catch (Exception e) { LOGGER.error("Can not get hdfs file system. {}", e); + fileSystem = defaultFS; } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/service/src/main/resources/application.properties ---------------------------------------------------------------------- diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties index c93d0e0..0577721 100644 --- a/service/src/main/resources/application.properties +++ b/service/src/main/resources/application.properties @@ -50,6 +50,8 @@ ldap.url=ldap://hostname:port [email protected] ldap.searchBase=DC=org,DC=example ldap.searchPattern=(sAMAccountName={0}) +# hdfs +# fs.defaultFS=hdfs://hdfs-default-name # elasticsearch elasticsearch.host=localhost elasticsearch.port=9200 http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/service/src/main/resources/env/env_streaming.json ---------------------------------------------------------------------- diff --git a/service/src/main/resources/env/env_streaming.json b/service/src/main/resources/env/env_streaming.json index 6508af1..9c3a8ae 100644 --- a/service/src/main/resources/env/env_streaming.json +++ b/service/src/main/resources/env/env_streaming.json @@ -48,7 +48,7 @@ "namespace": "griffin/infocache", "lock.path": "lock", "mode": "persist", - "init.clear": false, + "init.clear": true, "close.clear": false } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/service/src/test/resources/application.properties ---------------------------------------------------------------------- diff --git a/service/src/test/resources/application.properties b/service/src/test/resources/application.properties index 2a5f839..96d28dc 100644 --- a/service/src/test/resources/application.properties +++ b/service/src/test/resources/application.properties @@ -42,6 +42,8 @@ predicate.job.interval=5m predicate.job.repeat.count=12 # external properties directory location external.config.location= +# external BATCH or STREAMING env +external.env.location= # login strategy ("test" or "ldap") login.strategy=test # ldap
