Repository: incubator-griffin
Updated Branches:
  refs/heads/master 615f57dea -> a2ea0e84e


fix bugs of measure and service module for new json format

Author: Lionel Liu <[email protected]>

Closes #400 from bhlx3lyx7/json-update-new.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/a2ea0e84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/a2ea0e84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/a2ea0e84

Branch: refs/heads/master
Commit: a2ea0e84e94c584b12f2bed2a05a9d33c5c453bc
Parents: 615f57d
Author: Lionel Liu <[email protected]>
Authored: Fri Aug 24 15:17:55 2018 +0800
Committer: William Guo <[email protected]>
Committed: Fri Aug 24 15:17:55 2018 +0800

----------------------------------------------------------------------
 .../src/main/resources/config-streaming.json    |  3 +-
 .../datasource/cache/StreamingCacheClient.scala | 14 +++++---
 .../_accuracy-streaming-griffindsl.json         |  4 +--
 .../_completeness-streaming-griffindsl.json     |  3 +-
 .../_profiling-streaming-griffindsl.json        |  3 +-
 .../src/test/resources/env-streaming-mongo.json |  1 +
 .../org/apache/griffin/core/util/FSUtil.java    | 37 +++++++++++++++-----
 .../src/main/resources/application.properties   |  2 ++
 .../src/main/resources/env/env_streaming.json   |  2 +-
 .../src/test/resources/application.properties   |  2 ++
 10 files changed, 48 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/measure/src/main/resources/config-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-streaming.json 
b/measure/src/main/resources/config-streaming.json
index a7c47e8..1be7a0a 100644
--- a/measure/src/main/resources/config-streaming.json
+++ b/measure/src/main/resources/config-streaming.json
@@ -44,8 +44,7 @@
         "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["0", "0"],
-        "init.clear": true
+        "time.range": ["0", "0"]
       }
     }
   ],

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index 0ebe6ba..b351f82 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -71,10 +71,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable 
with WithFanIn[Long]
   val readyTimeDelay: Long = 
TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
   val deltaTimeRange: (Long, Long) = {
     def negative(n: Long): Long = if (n <= 0) n else 0
-    case class StringSeq(values:Seq[String])
     param.get(_TimeRange) match {
-      case Some(seq: StringSeq) => {
-        val nseq = seq.values.flatMap(TimeUtil.milliseconds(_))
+      case Some(seq: Seq[String]) => {
+        val nseq = seq.flatMap(TimeUtil.milliseconds(_))
         val ns = negative(nseq.headOption.getOrElse(0))
         val ne = negative(nseq.tail.headOption.getOrElse(0))
         (ns, ne)
@@ -100,6 +99,11 @@ trait StreamingCacheClient extends StreamingOffsetCacheable 
with WithFanIn[Long]
   protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
   protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
 
+  private def readDataFrameOpt(dfr: DataFrameReader, path: String): 
Option[DataFrame] = {
+    val df = readDataFrame(dfr, path)
+    if (df.count() > 0) Some(df) else None
+  }
+
   /**
     * save data frame in dump phase
     * @param dfOpt    data frame to be saved
@@ -170,7 +174,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable 
with WithFanIn[Long]
     // new cache data
     val newDfOpt = try {
       val dfr = sqlContext.read
-      Some(readDataFrame(dfr, newFilePath).filter(filterStr))
+      readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
     } catch {
       case e: Throwable => {
         warn(s"read data source cache warn: ${e.getMessage}")
@@ -184,7 +188,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable 
with WithFanIn[Long]
       val oldDfPath = s"${oldFilePath}/${idx}"
       try {
         val dfr = sqlContext.read
-        Some(readDataFrame(dfr, oldDfPath).filter(filterStr))
+        readDataFrameOpt(dfr, oldDfPath).map(_.filter(filterStr))
       } catch {
         case e: Throwable => {
           warn(s"read old data source cache warn: ${e.getMessage}")

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/measure/src/test/resources/_accuracy-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json 
b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
index 4492a35..9ce25df 100644
--- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
@@ -45,7 +45,6 @@
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
         "time.range": ["-2m", "0"],
-        "init.clear": true,
         "updatable": true
       }
     }, {
@@ -87,8 +86,7 @@
         "info.path": "target",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["-2m", "0"],
-        "init.clear": true
+        "time.range": ["-2m", "0"]
       }
     }
   ],

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/measure/src/test/resources/_completeness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json 
b/measure/src/test/resources/_completeness-streaming-griffindsl.json
index 02f0a39..53e1e7b 100644
--- a/measure/src/test/resources/_completeness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json
@@ -42,8 +42,7 @@
         "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["0", "0"],
-        "init.clear": true
+        "time.range": ["0", "0"]
       }
     }
   ],

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/measure/src/test/resources/_profiling-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json 
b/measure/src/test/resources/_profiling-streaming-griffindsl.json
index a523434..1d28745 100644
--- a/measure/src/test/resources/_profiling-streaming-griffindsl.json
+++ b/measure/src/test/resources/_profiling-streaming-griffindsl.json
@@ -42,8 +42,7 @@
         "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["0", "0"],
-        "init.clear": true
+        "time.range": ["0", "0"]
       }
     }
   ],

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/measure/src/test/resources/env-streaming-mongo.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-streaming-mongo.json 
b/measure/src/test/resources/env-streaming-mongo.json
index ef10aef..b06d611 100644
--- a/measure/src/test/resources/env-streaming-mongo.json
+++ b/measure/src/test/resources/env-streaming-mongo.json
@@ -4,6 +4,7 @@
     "checkpoint.dir": "hdfs://localhost/test/griffin/cp",
     "batch.interval": "2s",
     "process.interval": "10s",
+    "init.clear": true,
     "config": {
       "spark.master": "local[*]",
       "spark.task.maxFailures": 5,

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java 
b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
index 71f55d2..b537ac6 100644
--- a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
@@ -40,14 +40,31 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
 
+@Component
 public class FSUtil {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(FSUtil.class);
     private static final int SAMPLE_ROW_COUNT = 100;
 
+    private static String fsDefaultName;
+
     private static FileSystem fileSystem;
 
+    private static FileSystem defaultFS = getDefaultFileSystem();
+    private static FileSystem getDefaultFileSystem() {
+        FileSystem fs = null;
+        Configuration conf = new Configuration();
+        try {
+            fs = FileSystem.get(conf);
+        } catch (Exception e) {
+            LOGGER.error("Can not get default hdfs file system. {}", e);
+        }
+        return fs;
+    }
+
     private static FileSystem getFileSystem() {
         if (fileSystem == null) {
             initFileSystem();
@@ -55,26 +72,30 @@ public class FSUtil {
         return fileSystem;
     }
 
+    public FSUtil(@Value("${fs.defaultFS}") String defaultName) {
+        fsDefaultName = defaultName;
+    }
 
     private static void initFileSystem() {
         Configuration conf = new Configuration();
+        if (!StringUtils.isEmpty(fsDefaultName)) {
+            conf.set("fs.defaultFS", fsDefaultName);
+            LOGGER.info("Setting fs.defaultFS:{}", fsDefaultName);
+        }
 
         if (StringUtils.isEmpty(conf.get("fs.hdfs.impl"))) {
-            LOGGER.info("Setting fs.hdfs.impl:{}", org.apache.hadoop.hdfs
-                    .DistributedFileSystem.class.getName());
-            conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs
-                    .DistributedFileSystem.class.getName());
+            LOGGER.info("Setting fs.hdfs.impl:{}", 
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+            conf.set("fs.hdfs.impl", 
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
         }
         if (StringUtils.isEmpty(conf.get("fs.file.impl"))) {
-            LOGGER.info("Setting fs.file.impl:{}", org.apache.hadoop.fs
-                    .LocalFileSystem.class.getName());
-            conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class
-                    .getName());
+            LOGGER.info("Setting fs.file.impl:{}", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
+            conf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
         }
         try {
             fileSystem = FileSystem.get(conf);
         } catch (Exception e) {
             LOGGER.error("Can not get hdfs file system. {}", e);
+            fileSystem = defaultFS;
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/service/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/application.properties 
b/service/src/main/resources/application.properties
index c93d0e0..0577721 100644
--- a/service/src/main/resources/application.properties
+++ b/service/src/main/resources/application.properties
@@ -50,6 +50,8 @@ ldap.url=ldap://hostname:port
 [email protected]
 ldap.searchBase=DC=org,DC=example
 ldap.searchPattern=(sAMAccountName={0})
+# hdfs
+# fs.defaultFS=hdfs://hdfs-default-name
 # elasticsearch
 elasticsearch.host=localhost
 elasticsearch.port=9200

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/service/src/main/resources/env/env_streaming.json
----------------------------------------------------------------------
diff --git a/service/src/main/resources/env/env_streaming.json 
b/service/src/main/resources/env/env_streaming.json
index 6508af1..9c3a8ae 100644
--- a/service/src/main/resources/env/env_streaming.json
+++ b/service/src/main/resources/env/env_streaming.json
@@ -48,7 +48,7 @@
         "namespace": "griffin/infocache",
         "lock.path": "lock",
         "mode": "persist",
-        "init.clear": false,
+        "init.clear": true,
         "close.clear": false
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a2ea0e84/service/src/test/resources/application.properties
----------------------------------------------------------------------
diff --git a/service/src/test/resources/application.properties 
b/service/src/test/resources/application.properties
index 2a5f839..96d28dc 100644
--- a/service/src/test/resources/application.properties
+++ b/service/src/test/resources/application.properties
@@ -42,6 +42,8 @@ predicate.job.interval=5m
 predicate.job.repeat.count=12
 # external properties directory location
 external.config.location=
+# external BATCH or STREAMING env
+external.env.location=
 # login strategy ("test" or "ldap")
 login.strategy=test
 # ldap

Reply via email to