http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala new file mode 100644 index 0000000..42166ae --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala @@ -0,0 +1,217 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.streaming.info + +import org.apache.curator.framework.imps.CuratorFrameworkState +import org.apache.curator.framework.recipes.locks.InterProcessMutex +import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} +import org.apache.curator.retry.ExponentialBackoffRetry +import org.apache.curator.utils.ZKPaths +import org.apache.griffin.measure.context.streaming.lock.ZKCacheLock +import org.apache.zookeeper.CreateMode + +import scala.collection.JavaConverters._ + +/** + * leverage zookeeper for info cache + * @param config + * @param metricName + */ +case class ZKInfoCache(config: Map[String, Any], metricName: String) extends InfoCache { + + val Hosts = "hosts" + val Namespace = "namespace" + val Mode = "mode" + val InitClear = "init.clear" + val CloseClear = "close.clear" + val LockPath = "lock.path" + + val PersistRegex = """^(?i)persist$""".r + val EphemeralRegex = """^(?i)ephemeral$""".r + + final val separator = ZKPaths.PATH_SEPARATOR + + val hosts = config.getOrElse(Hosts, "").toString + val namespace = config.getOrElse(Namespace, "").toString + val mode: CreateMode = config.get(Mode) match { + case Some(s: String) => s match { + case PersistRegex() => CreateMode.PERSISTENT + case EphemeralRegex() => CreateMode.EPHEMERAL + case _ => CreateMode.PERSISTENT + } + case _ => CreateMode.PERSISTENT + } + val initClear = config.get(InitClear) match { + case Some(b: Boolean) => b + case _ => true + } + val closeClear = config.get(CloseClear) match { + case Some(b: Boolean) => b + case _ => false + } + val lockPath = config.getOrElse(LockPath, "lock").toString + + private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName + private val builder = CuratorFrameworkFactory.builder() + .connectString(hosts) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .namespace(cacheNamespace) + private val client: CuratorFramework = builder.build + + def init(): Unit = { + client.start() + info("start zk info cache") + client.usingNamespace(cacheNamespace) + info(s"init with namespace: ${cacheNamespace}") + deleteInfo(lockPath :: Nil) + if (initClear) { + clearInfo + } + } + + def available(): Boolean = { + client.getState match { + case CuratorFrameworkState.STARTED => true + case _ => false + } + } + + def close(): Unit = { + if (closeClear) { + clearInfo + } + info("close zk info cache") + client.close() + } + + def cacheInfo(info: Map[String, String]): Boolean = { + info.foldLeft(true) { (rs, pair) => + val (k, v) = pair + createOrUpdate(path(k), v) && rs + } + } + + def readInfo(keys: Iterable[String]): Map[String, String] = { + keys.flatMap { key => + read(path(key)) match { + case Some(v) => Some((key, v)) + case _ => None + } + }.toMap + } + + def deleteInfo(keys: Iterable[String]): Unit = { + keys.foreach { key => delete(path(key)) } + } + + def clearInfo(): Unit = { +// delete("/") + deleteInfo(TimeInfoCache.finalCacheInfoPath :: Nil) + deleteInfo(TimeInfoCache.infoPath :: Nil) + info("clear info") + } + + def listKeys(p: String): List[String] = { + children(path(p)) + } + + def genLock(s: String): ZKCacheLock = { + val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s + ZKCacheLock(new InterProcessMutex(client, lpt)) + } + + private def path(k: String): String = { + if (k.startsWith(separator)) k else separator + k + } + + private def children(path: String): List[String] = { + try { + client.getChildren().forPath(path).asScala.toList + } catch { + case e: Throwable => { + warn(s"list ${path} warn: ${e.getMessage}") + Nil + } + } + } + + private def createOrUpdate(path: String, content: String): Boolean = { + if (checkExists(path)) { + update(path, content) + } else { + create(path, content) + } + } + + private def create(path: String, content: String): Boolean = { + try { + client.create().creatingParentsIfNeeded().withMode(mode) + .forPath(path, content.getBytes("utf-8")) + true + } catch { + case e: Throwable => { + error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}") + false + } + } + } + + private def update(path: String, content: String): Boolean = { + try { + client.setData().forPath(path, content.getBytes("utf-8")) + true + } catch { + case e: Throwable => { + error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}") + false + } + } + } + + private def read(path: String): Option[String] = { + try { + Some(new String(client.getData().forPath(path), "utf-8")) + } catch { + case e: Throwable => { + warn(s"read ${path} warn: ${e.getMessage}") + None + } + } + } + + private def delete(path: String): Unit = { + try { + client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path) + } catch { + case e: Throwable => error(s"delete ${path} error: ${e.getMessage}") + } + } + + private def checkExists(path: String): Boolean = { + try { + client.checkExists().forPath(path) != null + } catch { + case e: Throwable => { + warn(s"check exists ${path} warn: ${e.getMessage}") + false + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala new file mode 100644 index 0000000..a2cfad9 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala @@ -0,0 +1,34 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.streaming.lock + +import java.util.concurrent.TimeUnit + +import org.apache.griffin.measure.Loggable + +/** + * lock for info cache + */ +trait CacheLock extends Loggable with Serializable { + + def lock(outtime: Long, unit: TimeUnit): Boolean + + def unlock(): Unit + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala new file mode 100644 index 0000000..6eea0b6 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala @@ -0,0 +1,39 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.streaming.lock + +import java.util.concurrent.TimeUnit + +case class MultiCacheLock(cacheLocks: List[CacheLock]) extends CacheLock { + + def lock(outtime: Long, unit: TimeUnit): Boolean = { + cacheLocks.headOption match { + case Some(cl) => cl.lock(outtime, unit) + case None => true + } + } + + def unlock(): Unit = { + cacheLocks.headOption match { + case Some(cl) => cl.unlock + case None => {} + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala new file mode 100644 index 0000000..2c9b717 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala @@ -0,0 +1,53 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.streaming.lock + +import java.util.concurrent.TimeUnit + +import org.apache.curator.framework.recipes.locks.InterProcessMutex + +case class ZKCacheLock(@transient mutex: InterProcessMutex) extends CacheLock { + + def lock(outtime: Long, unit: TimeUnit): Boolean = { + try { + if (outtime >= 0) { + mutex.acquire(outtime, unit) + } else { + mutex.acquire(-1, null) + } + } catch { + case e: Throwable => { + error(s"lock error: ${e.getMessage}") + false + } + } + + } + + def unlock(): Unit = { + try { + if (mutex.isAcquiredInThisProcess) mutex.release + } catch { + case e: Throwable => { + error(s"unlock error: ${e.getMessage}") + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala new file mode 100644 index 0000000..e69716e --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala @@ -0,0 +1,54 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.streaming.metric + +/** + * accuracy metric + * @param miss miss count + * @param total total count + */ +case class AccuracyMetric(miss: Long, total: Long) extends Metric { + + type T = AccuracyMetric + + override def isLegal(): Boolean = getTotal > 0 + + def update(delta: T): T = { + if (delta.miss < miss) AccuracyMetric(delta.miss, total) else this + } + + def initial(): Boolean = { + getMatch <= 0 + } + + def eventual(): Boolean = { + this.miss <= 0 + } + + def differsFrom(other: T): Boolean = { + (this.miss != other.miss) || (this.total != other.total) + } + + def getMiss = miss + def getTotal = total + def getMatch = total - miss + + def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100 + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala new file mode 100644 index 0000000..cc8e772 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala @@ -0,0 +1,78 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.streaming.metric + +import org.apache.griffin.measure.Loggable + +import scala.collection.mutable.{Map => MutableMap} + +/** + * in streaming mode, some metrics may update, + * the old metrics are cached here + */ +object CacheResults extends Loggable { + + case class CacheResult(timeStamp: Long, updateTime: Long, result: Metric) { + def olderThan(ut: Long): Boolean = updateTime < ut + def update(ut: Long, r: Metric): Option[Metric] = { + r match { + case m: result.T if (olderThan(ut)) => { + val ur = result.update(m) + if (result.differsFrom(ur)) Some(ur) else None + } + case _ => None + } + } + } + + private val cacheGroup: MutableMap[Long, CacheResult] = MutableMap() + + private def update(r: CacheResult): Unit = { + cacheGroup += (r.timeStamp -> r) + } + + /** + * input new metric results, output the updated metric results. + */ + def update(cacheResults: Iterable[CacheResult]): Iterable[CacheResult] = { + val updatedCacheResults = cacheResults.flatMap { cacheResult => + val CacheResult(t, ut, r) = cacheResult + (cacheGroup.get(t) match { + case Some(cr) => cr.update(ut, r) + case _ => Some(r) + }).map(m => CacheResult(t, ut, m)) + } + updatedCacheResults.foreach(r => update(r)) + updatedCacheResults + } + + /** + * clean the out-time cached results, to avoid memory leak + */ + def refresh(overtime: Long): Unit = { + val curCacheGroup = cacheGroup.toMap + val deadCache = curCacheGroup.filter { pr => + val (_, cr) = pr + cr.timeStamp < overtime || cr.result.eventual() + } + info(s"=== dead cache group count: ${deadCache.size} ===") + deadCache.keySet.foreach(cacheGroup -= _) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/Metric.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/Metric.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/Metric.scala new file mode 100644 index 0000000..5d91ee3 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/Metric.scala @@ -0,0 +1,35 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.streaming.metric + +trait Metric extends Serializable { + + type T <: Metric + + def isLegal(): Boolean = true + + def update(delta: T): T + + def initial(): Boolean + + def eventual(): Boolean + + def differsFrom(other: T): Boolean + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala new file mode 100644 index 0000000..8f424e6 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala @@ -0,0 +1,190 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.writer + +import java.util.Date + +import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil} +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.spark.rdd.RDD + +/** + * persist metric and record to hdfs + */ +case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + + val Path = "path" + val MaxPersistLines = "max.persist.lines" + val MaxLinesPerFile = "max.lines.per.file" + + val path = config.getOrElse(Path, "").toString + val maxPersistLines = config.getInt(MaxPersistLines, -1) + val maxLinesPerFile = math.min(config.getInt(MaxLinesPerFile, 10000), 1000000) + + val StartFile = filePath("_START") + val FinishFile = filePath("_FINISH") + val MetricsFile = filePath("_METRICS") + + val LogFile = filePath("_LOG") + + val _MetricName = "metricName" + val _Timestamp = "timestamp" + val _Value = "value" + + var _init = true + + def available(): Boolean = { + path.nonEmpty + } + + private def logHead: String = { + if (_init) { + _init = false + val dt = new Date(timeStamp) + s"================ log of ${dt} ================\n" + } else "" + } + + private def timeHead(rt: Long): String = { + val dt = new Date(rt) + s"--- ${dt} ---\n" + } + + private def logWrap(rt: Long, msg: String): String = { + logHead + timeHead(rt) + s"${msg}\n\n" + } + + protected def filePath(file: String): String = { + HdfsUtil.getHdfsFilePath(path, s"${metricName}/${timeStamp}/${file}") + } + + protected def withSuffix(path: String, suffix: String): String = { + s"${path}.${suffix}" + } + + def start(msg: String): Unit = { + try { + HdfsUtil.writeContent(StartFile, msg) + } catch { + case e: Throwable => error(e.getMessage) + } + } + def finish(): Unit = { + try { + HdfsUtil.createEmptyFile(FinishFile) + } catch { + case e: Throwable => error(e.getMessage) + } + } + + def log(rt: Long, msg: String): Unit = { + try { + val logStr = logWrap(rt, msg) + HdfsUtil.appendContent(LogFile, logStr) + } catch { + case e: Throwable => error(e.getMessage) + } + } + + private def getHdfsPath(path: String, groupId: Int): String = { + HdfsUtil.getHdfsFilePath(path, s"${groupId}") + } + private def getHdfsPath(path: String, ptnId: Int, groupId: Int): String = { + HdfsUtil.getHdfsFilePath(path, s"${ptnId}.${groupId}") + } + + private def clearOldRecords(path: String): Unit = { + HdfsUtil.deleteHdfsPath(path) + } + + def persistRecords(records: RDD[String], name: String): Unit = { + val path = filePath(name) + clearOldRecords(path) + try { + val recordCount = records.count + val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) + if (count > 0) { + val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt + if (groupCount <= 1) { + val recs = records.take(count.toInt) + persistRecords2Hdfs(path, recs) + } else { + val groupedRecords: RDD[(Long, Iterable[String])] = + records.zipWithIndex.flatMap { r => + val gid = r._2 / maxLinesPerFile + if (gid < groupCount) Some((gid, r._1)) else None + }.groupByKey() + groupedRecords.foreach { group => + val (gid, recs) = group + val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) + persistRecords2Hdfs(hdfsPath, recs) + } + } + } + } catch { + case e: Throwable => error(e.getMessage) + } + } + + def persistRecords(records: Iterable[String], name: String): Unit = { + val path = filePath(name) + clearOldRecords(path) + try { + val recordCount = records.size + val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) + if (count > 0) { + val groupCount = (count - 1) / maxLinesPerFile + 1 + if (groupCount <= 1) { + val recs = records.take(count.toInt) + persistRecords2Hdfs(path, recs) + } else { + val groupedRecords = records.grouped(maxLinesPerFile).zipWithIndex + groupedRecords.take(groupCount).foreach { group => + val (recs, gid) = group + val hdfsPath = getHdfsPath(path, gid) + persistRecords2Hdfs(hdfsPath, recs) + } + } + } + } catch { + case e: Throwable => error(e.getMessage) + } + } + + def persistMetrics(metrics: Map[String, Any]): Unit = { + val head = Map[String, Any]((_MetricName -> metricName), (_Timestamp -> timeStamp)) + val result = head + (_Value -> metrics) + try { + val json = JsonUtil.toJson(result) + persistRecords2Hdfs(MetricsFile, json :: Nil) + } catch { + case e: Throwable => error(e.getMessage) + } + } + + private def persistRecords2Hdfs(hdfsPath: String, records: Iterable[String]): Unit = { + try { + val recStr = records.mkString("\n") + HdfsUtil.writeContent(hdfsPath, recStr) + } catch { + case e: Throwable => error(e.getMessage) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala new file mode 100644 index 0000000..4c12652 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala @@ -0,0 +1,76 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.writer + +import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.spark.rdd.RDD + +import scala.concurrent.Future + +/** + * persist metric and record through http request + */ +case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + + val Api = "api" + val Method = "method" + + val api = config.getString(Api, "") + val method = config.getString(Method, "post") + + val _Value = "value" + + def available(): Boolean = { + api.nonEmpty + } + + def start(msg: String): Unit = {} + def finish(): Unit = {} + + private def httpResult(dataMap: Map[String, Any]) = { + try { + val data = JsonUtil.toJson(dataMap) + // http request + val params = Map[String, Object]() + val header = Map[String, Object](("Content-Type","application/json")) + + def func(): (Long, Future[Boolean]) = { + import scala.concurrent.ExecutionContext.Implicits.global + (timeStamp, Future(HttpUtil.httpRequest(api, method, params, header, data))) + } + PersistThreadPool.addTask(func _, 10) + } catch { + case e: Throwable => error(e.getMessage) + } + + } + + def log(rt: Long, msg: String): Unit = {} + + def persistRecords(records: RDD[String], name: String): Unit = {} + def persistRecords(records: Iterable[String], name: String): Unit = {} + + def persistMetrics(metrics: Map[String, Any]): Unit = { + val head = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp)) + val result = head + (_Value -> metrics) + httpResult(result) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala new file mode 100644 index 0000000..3063faf --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala @@ -0,0 +1,82 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.writer + +import org.apache.griffin.measure.utils.JsonUtil +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.spark.rdd.RDD + +/** + * persist metric and record to console, for debug + */ +case class LoggerPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + + val MaxLogLines = "max.log.lines" + + val maxLogLines = config.getInt(MaxLogLines, 100) + + def available(): Boolean = true + + def start(msg: String): Unit = { + println(s"[${timeStamp}] ${metricName} start: ${msg}") + } + def finish(): Unit = { + println(s"[${timeStamp}] ${metricName} finish") + } + + def log(rt: Long, msg: String): Unit = { + println(s"[${timeStamp}] ${rt}: ${msg}") + } + + def persistRecords(records: RDD[String], name: String): Unit = { +// println(s"${metricName} [${timeStamp}] records: ") +// try { +// val recordCount = records.count +// val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) +// val maxCount = count.toInt +// if (maxCount > 0) { +// val recordsArray = records.take(maxCount) +// recordsArray.foreach(println) +// } +// } catch { +// case e: Throwable => error(e.getMessage) +// } + } + + def persistRecords(records: Iterable[String], name: String): Unit = { +// println(s"${metricName} [${timeStamp}] records: ") +// try { +// val recordCount = records.size +// val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) +// if (count > 0) { +// records.foreach(println) +// } +// } catch { +// case e: Throwable => error(e.getMessage) +// } + } + + def persistMetrics(metrics: Map[String, Any]): Unit = { + println(s"${metricName} [${timeStamp}] metrics: ") + val json = JsonUtil.toJson(metrics) + println(json) + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala new file mode 100644 index 0000000..3cfcf04 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala @@ -0,0 +1,118 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.writer + +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.spark.rdd.RDD +import org.mongodb.scala._ +import org.mongodb.scala.model.{Filters, UpdateOptions, Updates} +import org.mongodb.scala.result.UpdateResult + +import scala.concurrent.Future + +/** + * persist metric and record to mongo + */ +case class MongoPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + + MongoConnection.init(config) + + val _MetricName = "metricName" + val _Timestamp = "timestamp" + val _Value = "value" + + def available(): Boolean = MongoConnection.dataConf.available + + def start(msg: String): Unit = {} + def finish(): Unit = {} + + def log(rt: Long, msg: String): Unit = {} + + def persistRecords(records: RDD[String], name: String): Unit = {} + def persistRecords(records: Iterable[String], name: String): Unit = {} + + def persistMetrics(metrics: Map[String, Any]): Unit = { + mongoInsert(metrics) + } + + private val filter = Filters.and( + Filters.eq(_MetricName, metricName), + Filters.eq(_Timestamp, timeStamp) + ) + + private def mongoInsert(dataMap: Map[String, Any]): Unit = { + try { + val update = Updates.set(_Value, dataMap) + def func(): (Long, Future[UpdateResult]) = { + (timeStamp, MongoConnection.getDataCollection.updateOne( + filter, update, UpdateOptions().upsert(true)).toFuture) + } + PersistThreadPool.addTask(func _, 10) + } catch { + case e: Throwable => error(e.getMessage) + } + } + +} + +object MongoConnection { + + case class MongoConf(url: String, database: String, collection: String) { + def available: Boolean = url.nonEmpty && database.nonEmpty && collection.nonEmpty + } + + val _MongoHead = "mongodb://" + + val Url = "url" + val Database = "database" + val Collection = "collection" + + private var initialed = false + + var dataConf: MongoConf = _ + private var dataCollection: MongoCollection[Document] = _ + + def getDataCollection = dataCollection + + def init(config: Map[String, Any]): Unit = { + if (!initialed) { + dataConf = mongoConf(config) + dataCollection = mongoCollection(dataConf) + initialed = true + } + } + + private def mongoConf(cfg: Map[String, Any]): MongoConf = { + val url = cfg.getString(Url, "").trim + val mongoUrl = if (url.startsWith(_MongoHead)) url else { + _MongoHead + url + } + MongoConf( + mongoUrl, + cfg.getString(Database, ""), + cfg.getString(Collection, "") + ) + } + private def mongoCollection(mongoConf: MongoConf): MongoCollection[Document] = { + val mongoClient: MongoClient = MongoClient(mongoConf.url) + val database: MongoDatabase = mongoClient.getDatabase(mongoConf.database) + database.getCollection(mongoConf.collection) + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala new file mode 100644 index 0000000..ea9133a --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala @@ -0,0 +1,83 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.writer + +import org.apache.spark.rdd.RDD + +/** + * persist metric and record in multiple ways + */ +case class MultiPersists(persists: Iterable[Persist]) extends Persist { + + val metricName: String = persists match { + case Nil => "" + case _ => persists.head.metricName + } + + val timeStamp: Long = persists match { + case Nil => 0 + case _ => persists.head.timeStamp + } + + val config: Map[String, Any] = Map[String, Any]() + + def available(): Boolean = { persists.exists(_.available()) } + + def start(msg: String): Unit = { persists.foreach(_.start(msg)) } + def finish(): Unit = { persists.foreach(_.finish()) } + + def log(rt: Long, msg: String): Unit = { + persists.foreach { persist => + try { + persist.log(rt, msg) + } catch { + case e: Throwable => error(s"log error: ${e.getMessage}") + } + } + } + + def persistRecords(records: RDD[String], name: String): Unit = { + persists.foreach { persist => + try { + persist.persistRecords(records, name) + } catch { + case e: Throwable => error(s"persist records error: ${e.getMessage}") + } + } + } + def persistRecords(records: Iterable[String], name: String): Unit = { + persists.foreach { persist => + try { + persist.persistRecords(records, name) + } catch { + case e: Throwable => error(s"persist records error: ${e.getMessage}") + } + } + } + def persistMetrics(metrics: Map[String, Any]): Unit = { + persists.foreach { persist => + try { + persist.persistMetrics(metrics) + } catch { + case e: Throwable => error(s"persist metrics error: ${e.getMessage}") + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala new file mode 100644 index 0000000..28eeb64 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala @@ -0,0 +1,45 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.writer + +import org.apache.griffin.measure.Loggable +import org.apache.spark.rdd.RDD + +/** + * persist metric and record + */ +trait Persist extends Loggable with Serializable { + val metricName: String + val timeStamp: Long + + val config: Map[String, Any] + + def available(): Boolean + + def start(msg: String): Unit + def finish(): Unit + + def log(rt: Long, msg: String): Unit + + def persistRecords(records: RDD[String], name: String): Unit + def persistRecords(records: Iterable[String], name: String): Unit + + def persistMetrics(metrics: Map[String, Any]): Unit + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala new file mode 100644 index 0000000..12b5f0b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala @@ -0,0 +1,55 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.writer + +import org.apache.griffin.measure.configuration.params.PersistParam + +import scala.util.{Success, Try} + + +case class PersistFactory(persistParams: Iterable[PersistParam], metricName: String) extends Serializable { + + val HDFS_REGEX = """^(?i)hdfs$""".r + val HTTP_REGEX = """^(?i)http$""".r + val LOG_REGEX = """^(?i)log$""".r + val MONGO_REGEX = """^(?i)mongo$""".r + + def getPersists(timeStamp: Long): MultiPersists = { + MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param))) + } + + /** + * create persist by param + */ + private def getPersist(timeStamp: Long, persistParam: PersistParam): Option[Persist] = { + val config = persistParam.config + val persistTry = persistParam.persistType match { + case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp)) + case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp)) + case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp)) + case MONGO_REGEX() => Try(MongoPersist(config, metricName, timeStamp)) + case _ => throw new Exception("not supported persist type") + } + persistTry match { + case Success(persist) if (persist.available) => Some(persist) + case _ => None + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala new file mode 100644 index 0000000..221fcad --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala @@ -0,0 +1,81 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.writer + +import java.util.Date +import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} + +import org.apache.griffin.measure.Loggable + +import scala.concurrent.Future +import scala.util.{Failure, Success} + +/** + * persist thread pool, to persist metrics in parallel mode + */ +object PersistThreadPool extends Loggable { + + import scala.concurrent.ExecutionContext.Implicits.global + + private val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor] + val MAX_RETRY = 100 + + def start(): Unit = { + } + + def shutdown(): Unit = { + pool.shutdown() + pool.awaitTermination(10, TimeUnit.SECONDS) + } + + def addTask(func: () => (Long, Future[_]), retry: Int): Unit = { + val r = if (retry < 0) MAX_RETRY else retry + info(s"add task, current task num: ${pool.getQueue.size}") + pool.submit(Task(func, r)) + } + + case class Task(func: () => (Long, Future[_]), retry: Int) extends Runnable with Loggable { + + override def run(): Unit = { + val st = new Date().getTime + val (t, res) = func() + res.onComplete { + case Success(value) => { + val et = new Date().getTime + info(s"task ${t} success [ using time ${et - st} ms ]") + } + case Failure(e) => { + val et = new Date().getTime + warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}") + if (retry > 0) { + info(s"task ${t} retry [ rest retry count: ${retry - 1} ]") + pool.submit(Task(func, retry - 1)) + } else { + fail(s"task ${t} retry ends but fails") + } + } + } + } + + def fail(msg: String): Unit = { + error(s"task fails: ${msg}") + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala deleted file mode 100644 index 79d4250..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala +++ /dev/null @@ -1,147 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.connector - -import java.util.concurrent.atomic.AtomicLong - -import org.apache.griffin.measure.cache.tmst.TmstCache -import org.apache.griffin.measure.config.params.user.DataConnectorParam -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.process.{BatchDqProcess, BatchProcessType} -import org.apache.griffin.measure.process.engine._ -import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, TimeRange} -import org.apache.griffin.measure.rule.adaptor.{InternalColumns, PreProcPhase, RuleAdaptorGroup, RunPhase} -import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.rule.preproc.PreProcRuleGenerator -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.{DataFrame, SQLContext} - - -trait DataConnector extends Loggable with Serializable { - -// def available(): Boolean - - var tmstCache: TmstCache = _ - protected def saveTmst(t: Long) = tmstCache.insert(t) - protected def readTmst(t: Long) = tmstCache.range(t, t + 1) - - def init(): Unit - - def data(ms: Long): (Option[DataFrame], TimeRange) - - val dqEngines: DqEngines - - val dcParam: DataConnectorParam - - val sqlContext: SQLContext - - val id: String = DataConnectorIdGenerator.genId - - protected def suffix(ms: Long): String = s"${id}_${ms}" - protected def thisName(ms: Long): String = s"this_${suffix(ms)}" - - final val tmstColName = InternalColumns.tmst - - def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = { - val timeInfo = CalcTimeInfo(ms, id) - val thisTable = thisName(ms) - - try { - saveTmst(ms) // save tmst - - dfOpt.flatMap { df => - val preProcRules = PreProcRuleGenerator.genPreProcRules(dcParam.preProc, suffix(ms)) - - // init data - TableRegisters.registerRunTempTable(df, timeInfo.key, thisTable) - -// val dsTmsts = Map[String, Set[Long]]((thisTable -> Set[Long](ms))) -// val tmsts = Seq[Long](ms) - val dsTimeRanges = Map[String, TimeRange]((thisTable -> TimeRange(ms))) - - // generate rule steps - val rulePlan = RuleAdaptorGroup.genRulePlan( - timeInfo, preProcRules, SparkSqlType, BatchProcessType, dsTimeRanges) - - // run rules - dqEngines.runRuleSteps(timeInfo, rulePlan.ruleSteps) - - // out data - val outDf = sqlContext.table(s"`${thisTable}`") - -// names.foreach { name => -// try { -// TempTables.unregisterTempTable(sqlContext, ms, name) -// } catch { -// case e: Throwable => warn(s"drop temp table ${name} fails") -// } -// } - -// val range = if (id == "dc1") (0 until 20).toList else (0 until 1).toList -// val withTmstDfs = range.map { i => -// saveTmst(ms + i) -// outDf.withColumn(tmstColName, lit(ms + i)).limit(49 - i) -// } -// Some(withTmstDfs.reduce(_ unionAll _)) - - // add tmst column - val withTmstDf = outDf.withColumn(tmstColName, lit(ms)) - - // tmst cache -// saveTmst(ms) - - // drop temp tables - cleanData(timeInfo) - - Some(withTmstDf) - } - } catch { - case e: Throwable => { - error(s"pre-process of data connector [${id}] error: ${e.getMessage}") - None - } - } - - } - - private def cleanData(timeInfo: TimeInfo): Unit = { - TableRegisters.unregisterRunTempTables(sqlContext, timeInfo.key) - TableRegisters.unregisterCompileTempTables(timeInfo.key) - - DataFrameCaches.uncacheDataFrames(timeInfo.key) - DataFrameCaches.clearTrashDataFrames(timeInfo.key) - } - -} - -object DataConnectorIdGenerator { - private val counter: AtomicLong = new AtomicLong(0L) - private val head: String = "dc" - - def genId: String = { - s"${head}${increment}" - } - - private def increment: Long = { - counter.incrementAndGet() - } -} - http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala deleted file mode 100644 index 27b390a..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala +++ /dev/null @@ -1,147 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.connector - -import kafka.serializer.StringDecoder -import org.apache.griffin.measure.config.params.user._ -import org.apache.griffin.measure.data.connector.streaming.{KafkaStreamingDataConnector, KafkaStreamingStringDataConnector, StreamingDataConnector} -import org.apache.griffin.measure.process.engine.{DqEngine, DqEngines} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame - -import scala.util.Success -//import org.apache.griffin.measure.data.connector.cache._ -import org.apache.griffin.measure.data.connector.batch._ -//import org.apache.griffin.measure.data.connector.streaming._ -import org.apache.spark.sql.SQLContext -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.dstream.InputDStream -import org.apache.spark.streaming.kafka.KafkaUtils - -import scala.reflect.ClassTag -import scala.util.Try - -object DataConnectorFactory { - - val HiveRegex = """^(?i)hive$""".r - val AvroRegex = """^(?i)avro$""".r - val TextDirRegex = """^(?i)text-dir$""".r - - val KafkaRegex = """^(?i)kafka$""".r - - val TextRegex = """^(?i)text$""".r - - def getDataConnector(sqlContext: SQLContext, - @transient ssc: StreamingContext, - dqEngines: DqEngines, - dataConnectorParam: DataConnectorParam - ): Try[DataConnector] = { - val conType = dataConnectorParam.conType - val version = dataConnectorParam.version - val config = dataConnectorParam.config - Try { - conType match { - case HiveRegex() => HiveBatchDataConnector(sqlContext, dqEngines, dataConnectorParam) - case AvroRegex() => AvroBatchDataConnector(sqlContext, dqEngines, dataConnectorParam) - case TextDirRegex() => TextDirBatchDataConnector(sqlContext, dqEngines, dataConnectorParam) - case KafkaRegex() => { - getStreamingDataConnector(sqlContext, ssc, dqEngines, dataConnectorParam) - } - case _ => throw new Exception("connector creation error!") - } - } - } - - private def getStreamingDataConnector(sqlContext: SQLContext, - @transient ssc: StreamingContext, - dqEngines: DqEngines, - dataConnectorParam: DataConnectorParam - ): StreamingDataConnector = { - if (ssc == null) throw new Exception("streaming context is null!") - val conType = dataConnectorParam.conType - val version = dataConnectorParam.version - conType match { - case KafkaRegex() => genKafkaDataConnector(sqlContext, ssc, dqEngines, dataConnectorParam) - case _ => throw new Exception("streaming connector creation error!") - } - } -// -// private def getCacheDataConnector(sqlContext: SQLContext, -// dataCacheParam: DataCacheParam -// ): Try[CacheDataConnector] = { -// if (dataCacheParam == null) { -// throw new Exception("invalid data cache param!") -// } -// val cacheType = dataCacheParam.cacheType -// Try { -// cacheType match { -// case HiveRegex() => HiveCacheDataConnector(sqlContext, dataCacheParam) -// case TextRegex() => TextCacheDataConnector(sqlContext, dataCacheParam) -// case _ => throw new Exception("cache connector creation error!") -// } -// } -// } -// - private def genKafkaDataConnector(sqlContext: SQLContext, - @transient ssc: StreamingContext, - dqEngines: DqEngines, - dataConnectorParam: DataConnectorParam - ) = { - val config = dataConnectorParam.config - val KeyType = "key.type" - val ValueType = "value.type" - val keyType = config.getOrElse(KeyType, "java.lang.String").toString - val valueType = config.getOrElse(ValueType, "java.lang.String").toString - (getClassTag(keyType), getClassTag(valueType)) match { - case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => { - KafkaStreamingStringDataConnector(sqlContext, ssc, dqEngines, dataConnectorParam) - } - case _ => { - throw new Exception("not supported type kafka data connector") - } - } - } - - private def getClassTag(tp: String): ClassTag[_] = { - try { - val clazz = Class.forName(tp) - ClassTag(clazz) - } catch { - case e: Throwable => throw e - } - } - - def filterBatchDataConnectors(connectors: Seq[DataConnector]): Seq[BatchDataConnector] = { - connectors.flatMap { dc => - dc match { - case mdc: BatchDataConnector => Some(mdc) - case _ => None - } - } - } - def filterStreamingDataConnectors(connectors: Seq[DataConnector]): Seq[StreamingDataConnector] = { - connectors.flatMap { dc => - dc match { - case mdc: StreamingDataConnector => Some(mdc) - case _ => None - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala deleted file mode 100644 index 5a1c22c..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.connector.batch - -import org.apache.griffin.measure.config.params.user.DataConnectorParam -import org.apache.griffin.measure.data.connector._ -import org.apache.griffin.measure.process.engine.DqEngines -import org.apache.griffin.measure.process.temp.TimeRange -import org.apache.griffin.measure.result._ -import org.apache.griffin.measure.utils.HdfsUtil -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.griffin.measure.utils.ParamUtil._ - -import scala.util.Try - -// data connector for avro file -case class AvroBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines, dcParam: DataConnectorParam - ) extends BatchDataConnector { - - val config = dcParam.config - - val FilePath = "file.path" - val FileName = "file.name" - - val filePath = config.getString(FilePath, "") - val fileName = config.getString(FileName, "") - - val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName - - private def pathPrefix(): Boolean = { - filePath.nonEmpty - } - - private def fileExist(): Boolean = { - HdfsUtil.existPath(concreteFileFullPath) - } - - def data(ms: Long): (Option[DataFrame], TimeRange) = { - val dfOpt = try { - val df = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath) - val dfOpt = Some(df) - val preDfOpt = preProcess(dfOpt, ms) - preDfOpt - } catch { - case e: Throwable => { - error(s"load avro file ${concreteFileFullPath} fails") - None - } - } - val tmsts = readTmst(ms) - (dfOpt, TimeRange(ms, tmsts)) - } - -// def available(): Boolean = { -// (!concreteFileFullPath.isEmpty) && fileExist -// } - -// def init(): Unit = {} - -// def metaData(): Try[Iterable[(String, String)]] = { -// Try { -// val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema -// st.fields.map(f => (f.name, f.dataType.typeName)) -// } -// } - -// def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = { -// Try { -// loadDataFile.flatMap { row => -// // generate cache data -// val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap) -// val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps) -// -// // data info -// val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info => -// try { -// (info.key -> row.getAs[info.T](info.key)) -// } catch { -// case e: Throwable => info.defWrap -// } -// }.toMap -// -// finalExprValueMaps.flatMap { finalExprValueMap => -// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr => -// expr.calculate(finalExprValueMap) match { -// case Some(v) => Some(v.asInstanceOf[AnyRef]) -// case _ => None -// } -// } -// val key = toTuple(groupbyData) -// -// Some((key, (finalExprValueMap, dataInfoMap))) -// } -// } -// } -// } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala deleted file mode 100644 index 4d138ab..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.connector.batch - -import org.apache.griffin.measure.data.connector._ -//import org.apache.griffin.measure.data.connector.cache.DataUpdatable -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.types.StructType - -import scala.util.{Failure, Success, Try} - - -trait BatchDataConnector extends DataConnector { - -// def metaData(): Option[StructType] - - def init(): Unit = {} - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala deleted file mode 100644 index e57bd32..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala +++ /dev/null @@ -1,165 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.connector.batch - -import org.apache.griffin.measure.config.params.user.DataConnectorParam -import org.apache.griffin.measure.data.connector._ -import org.apache.griffin.measure.process.engine.DqEngines -import org.apache.griffin.measure.process.temp.TimeRange -import org.apache.griffin.measure.result._ -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.sql.{DataFrame, SQLContext} - -import scala.util.{Success, Try} -import org.apache.griffin.measure.utils.ParamUtil._ - -// data connector for hive -case class HiveBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines, dcParam: DataConnectorParam - ) extends BatchDataConnector { - - val config = dcParam.config - -// if (!sqlContext.isInstanceOf[HiveContext]) { -// throw new Exception("hive context not prepared!") -// } - - val Database = "database" - val TableName = "table.name" - val Where = "where" - - val database = config.getString(Database, "default") - val tableName = config.getString(TableName, "") - val whereString = config.getString(Where, "") - - val concreteTableName = s"${database}.${tableName}" -// val partitions = partitionsString.split(";").map(s => s.split(",").map(_.trim)) - val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty) -// val wheres: Array[Array[String]] = whereString.split(",").flatMap { s => -// val arr = s.trim.split(",").flatMap { t => -// t.trim match { -// case p if (p.nonEmpty) => Some(p) -// case _ => None -// } -// } -// if (arr.size > 0) Some(arr) else None -// } - - def data(ms: Long): (Option[DataFrame], TimeRange) = { - val dfOpt = try { - val dtSql = dataSql - info(dtSql) - val df = sqlContext.sql(dtSql) - val dfOpt = Some(df) - val preDfOpt = preProcess(dfOpt, ms) - preDfOpt - } catch { - case e: Throwable => { - error(s"load hive table ${concreteTableName} fails: ${e.getMessage}") - None - } - } - val tmsts = readTmst(ms) - (dfOpt, TimeRange(ms, tmsts)) - } - -// def available(): Boolean = { -// (!tableName.isEmpty) && { -// Try { -// if (dbPrefix) { -// sqlContext.tables(database).filter(tableExistsSql).collect.size -// } else { -// sqlContext.tables().filter(tableExistsSql).collect.size -// } -// } match { -// case Success(s) => s > 0 -// case _ => false -// } -// } -// } - -// def init(): Unit = {} - -// def metaData(): Try[Iterable[(String, String)]] = { -// Try { -// val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), r.getString(1))).collect -// val partitionPos: Int = originRows.indexWhere(pair => pair._1.startsWith("# ")) -// if (partitionPos < 0) originRows -// else originRows.take(partitionPos) -// } -// } - -// def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = { -// Try { -// sqlContext.sql(dataSql).flatMap { row => -// // generate cache data -// val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap) -// val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps) -// -// // data info -// val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info => -// try { -// (info.key -> row.getAs[info.T](info.key)) -// } catch { -// case e: Throwable => info.defWrap -// } -// }.toMap -// -// finalExprValueMaps.flatMap { finalExprValueMap => -// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr => -// expr.calculate(finalExprValueMap) match { -// case Some(v) => Some(v.asInstanceOf[AnyRef]) -// case _ => None -// } -// } -// val key = toTuple(groupbyData) -// -// Some((key, (finalExprValueMap, dataInfoMap))) -// } -// } -// } -// } - - private def tableExistsSql(): String = { -// s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql - s"tableName LIKE '${tableName}'" - } - - private def metaDataSql(): String = { - s"DESCRIBE ${concreteTableName}" - } - - private def dataSql(): String = { - val tableClause = s"SELECT * FROM ${concreteTableName}" - if (wheres.length > 0) { - val clauses = wheres.map { w => - s"${tableClause} WHERE ${w}" - } - clauses.mkString(" UNION ALL ") - } else tableClause - } - -// private def toTuple[A <: AnyRef](as: Seq[A]): Product = { -// if (as.size > 0) { -// val tupleClass = Class.forName("scala.Tuple" + as.size) -// tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product] -// } else None -// } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala deleted file mode 100644 index 16e79ae..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.connector.batch - -import org.apache.griffin.measure.config.params.user.DataConnectorParam -import org.apache.griffin.measure.process.engine.DqEngines -import org.apache.griffin.measure.process.temp.TimeRange -import org.apache.griffin.measure.utils.HdfsUtil -import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.griffin.measure.utils.ParamUtil._ - -// data connector for avro file -case class TextDirBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines, dcParam: DataConnectorParam - ) extends BatchDataConnector { - - val config = dcParam.config - - val DirPath = "dir.path" - val DataDirDepth = "data.dir.depth" - val SuccessFile = "success.file" - val DoneFile = "done.file" - - val dirPath = config.getString(DirPath, "") - val dataDirDepth = config.getInt(DataDirDepth, 0) - val successFile = config.getString(SuccessFile, "_SUCCESS") - val doneFile = config.getString(DoneFile, "_DONE") - - val ignoreFilePrefix = "_" - - private def dirExist(): Boolean = { - HdfsUtil.existPath(dirPath) - } - - def data(ms: Long): (Option[DataFrame], TimeRange) = { - val dfOpt = try { - val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable) - // touch done file for read dirs - dataDirs.foreach(dir => touchDone(dir)) - - val validDataDirs = dataDirs.filter(dir => !emptyDir(dir)) - - if (validDataDirs.nonEmpty) { - val df = sqlContext.read.text(validDataDirs: _*) - val dfOpt = Some(df) - val preDfOpt = preProcess(dfOpt, ms) - preDfOpt - } else { - None - } - } catch { - case e: Throwable => { - error(s"load text dir ${dirPath} fails: ${e.getMessage}") - None - } - } - val tmsts = readTmst(ms) - (dfOpt, TimeRange(ms, tmsts)) - } - - private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = { - val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, "dir", true) } - if (depth <= 0) { - subDirs.filter(filteFunc) - } else { - listSubDirs(subDirs, depth - 1, filteFunc) - } - } - - private def readable(dir: String): Boolean = isSuccess(dir) && !isDone(dir) - private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, doneFile) - private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, successFile) - - private def touchDone(dir: String): Unit = HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile)) - - private def emptyDir(dir: String): Boolean = { - !HdfsUtil.listSubPathsByType(dir, "file").exists(!_.startsWith(ignoreFilePrefix)) - } - -// def available(): Boolean = { -// (!concreteFileFullPath.isEmpty) && fileExist -// } - -// def init(): Unit = {} - -// def metaData(): Try[Iterable[(String, String)]] = { -// Try { -// val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema -// st.fields.map(f => (f.name, f.dataType.typeName)) -// } -// } - -// def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = { -// Try { -// loadDataFile.flatMap { row => -// // generate cache data -// val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap) -// val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps) -// -// // data info -// val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info => -// try { -// (info.key -> row.getAs[info.T](info.key)) -// } catch { -// case e: Throwable => info.defWrap -// } -// }.toMap -// -// finalExprValueMaps.flatMap { finalExprValueMap => -// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr => -// expr.calculate(finalExprValueMap) match { -// case Some(v) => Some(v.asInstanceOf[AnyRef]) -// case _ => None -// } -// } -// val key = toTuple(groupbyData) -// -// Some((key, (finalExprValueMap, dataInfoMap))) -// } -// } -// } -// } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala deleted file mode 100644 index d46d6b7..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.connector.streaming - -import kafka.serializer.Decoder -import org.apache.spark.streaming.dstream.InputDStream - -import scala.util.{Failure, Success, Try} -import org.apache.griffin.measure.utils.ParamUtil._ - -trait KafkaStreamingDataConnector extends StreamingDataConnector { - - type KD <: Decoder[K] - type VD <: Decoder[V] - type OUT = (K, V) - - val config = dcParam.config - - val KafkaConfig = "kafka.config" - val Topics = "topics" - - val kafkaConfig = config.getAnyRef(KafkaConfig, Map[String, String]()) - val topics = config.getString(Topics, "") - - def available(): Boolean = { - true - } - - def init(): Unit = { - // register fan in - dataSourceCacheOpt.foreach(_.registerFanIn) - - val ds = stream match { - case Success(dstream) => dstream - case Failure(ex) => throw ex - } - ds.foreachRDD((rdd, time) => { - val ms = time.milliseconds - val saveDfOpt = try { - // coalesce partition number - val prlCount = rdd.sparkContext.defaultParallelism - val ptnCount = rdd.getNumPartitions - val repartitionedRdd = if (prlCount < ptnCount) { - rdd.coalesce(prlCount) - } else rdd - - val dfOpt = transform(repartitionedRdd) - - preProcess(dfOpt, ms) - } catch { - case e: Throwable => { - error(s"streaming data connector error: ${e.getMessage}") - None - } - } - - // save data frame - dataSourceCacheOpt.foreach(_.saveData(saveDfOpt, ms)) - }) - } - - def stream(): Try[InputDStream[OUT]] = Try { - val topicSet = topics.split(",").toSet - createDStream(topicSet) - } - - protected def createDStream(topicSet: Set[String]): InputDStream[OUT] -} - - - http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala deleted file mode 100644 index 038cb77..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.connector.streaming - -import kafka.serializer.StringDecoder -import org.apache.griffin.measure.config.params.user.DataConnectorParam -import org.apache.griffin.measure.process.engine.DqEngines -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.dstream.InputDStream -import org.apache.spark.streaming.kafka.KafkaUtils -import org.apache.spark.sql.functions.lit - -case class KafkaStreamingStringDataConnector(sqlContext: SQLContext, - @transient ssc: StreamingContext, - dqEngines: DqEngines, - dcParam: DataConnectorParam - ) extends KafkaStreamingDataConnector { - type K = String - type KD = StringDecoder - type V = String - type VD = StringDecoder - - val valueColName = "value" - val schema = StructType(Array( - StructField(valueColName, StringType) - )) - - def createDStream(topicSet: Set[String]): InputDStream[OUT] = { - KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet) - } - - def transform(rdd: RDD[OUT]): Option[DataFrame] = { - if (rdd.isEmpty) None else { - try { - val rowRdd = rdd.map(d => Row(d._2)) - val df = sqlContext.createDataFrame(rowRdd, schema) - Some(df) - } catch { - case e: Throwable => { - error(s"streaming data transform fails") - None - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala deleted file mode 100644 index e52ca1b..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.connector.streaming - -import org.apache.griffin.measure.data.connector._ -import org.apache.griffin.measure.data.source.cache._ -import org.apache.griffin.measure.process.temp.TimeRange -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame -import org.apache.spark.streaming.dstream.InputDStream - -import scala.util.Try - - -trait StreamingDataConnector extends DataConnector { - - type K - type V - type OUT - - protected def stream(): Try[InputDStream[OUT]] - - def transform(rdd: RDD[OUT]): Option[DataFrame] - - def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange.emptyTimeRange) - - var dataSourceCacheOpt: Option[DataSourceCache] = None - -}
