http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala new file mode 100644 index 0000000..70ddcde --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala @@ -0,0 +1,125 @@ +///* +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. +//*/ +//package org.apache.griffin.measure.data.connector.direct +// +//import org.apache.griffin.measure.config.params.user.DataConnectorParam +//import org.apache.griffin.measure.data.connector.DataConnectorFactory +//import org.apache.griffin.measure.data.connector.cache.CacheDataConnector +//import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector +//import org.apache.griffin.measure.result._ +//import org.apache.griffin.measure.rule._ +//import org.apache.spark.rdd.RDD +//import org.apache.spark.sql.SQLContext +//import org.apache.spark.streaming.StreamingContext +// +//import scala.util.{Failure, Success, Try} +// +//case class KafkaCacheDirectDataConnector(@transient streamingDataConnectorTry: Try[StreamingDataConnector], +// cacheDataConnectorTry: Try[CacheDataConnector], +// dataConnectorParam: DataConnectorParam, +// ruleExprs: RuleExprs, +// constFinalExprValueMap: Map[String, Any] +// ) extends StreamingCacheDirectDataConnector { +// +// val cacheDataConnector: CacheDataConnector = cacheDataConnectorTry match { +// case Success(cntr) => cntr +// case Failure(ex) => throw ex +// } +// @transient val streamingDataConnector: StreamingDataConnector = streamingDataConnectorTry match { +// case Success(cntr) => cntr +// case Failure(ex) => throw ex +// } +// +// protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)], +// ms: Long +// ): RDD[Map[String, Any]] = { +// val dataInfoMap = DataInfo.cacheInfoList.map(_.defWrap).toMap + TimeStampInfo.wrap(ms) +// +// rdd.flatMap { kv => +// val msg = kv._2 +// +// val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(msg), ruleExprs.cacheExprs, constFinalExprValueMap) +// val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps) +// +// finalExprValueMaps.map { vm => +// vm ++ dataInfoMap +// } +// } +// } +// +// def metaData(): Try[Iterable[(String, String)]] = Try { +// Map.empty[String, String] +// } +// +// def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = Try { +// cacheDataConnector.readData match { +// case Success(rdd) => { +// rdd.flatMap { row => +// val finalExprValueMap = ruleExprs.finalCacheExprs.flatMap { expr => +// row.get(expr._id).flatMap { d => +// Some((expr._id, d)) +// } +// }.toMap +// +// val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info => +// row.get(info.key) match { +// case Some(d) => (info.key -> d) +// case _ => info.defWrap +// } +// }.toMap +// +// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr => +// expr.calculate(finalExprValueMap) match { +// case Some(v) => Some(v.asInstanceOf[AnyRef]) +// case _ => None +// } +// } +// val key = toTuple(groupbyData) +// +// Some((key, (finalExprValueMap, dataInfoMap))) +// } +// } +// case Failure(ex) => throw ex +// } +// } +// +// override def cleanOldData(): Unit = { +// cacheDataConnector.cleanOldData +// } +// +// override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = { +// if (dataConnectorParam.getMatchOnce) { +// cacheDataConnector.updateOldData(t, oldData) +// } +// } +// +// override def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = { +// if (dataConnectorParam.getMatchOnce) { +// cacheDataConnector.updateAllOldData(oldRdd) +// } +// } +// +// private def toTuple[A <: AnyRef](as: Seq[A]): Product = { +// if (as.size > 0) { +// val tupleClass = Class.forName("scala.Tuple" + as.size) +// tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product] +// } else None +// } +// +//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala new file mode 100644 index 0000000..dddf430 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala @@ -0,0 +1,60 @@ +///* +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. +//*/ +//package org.apache.griffin.measure.data.connector.direct +// +//import org.apache.griffin.measure.data.connector.cache.CacheDataConnector +//import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector +//import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo} +//import org.apache.griffin.measure.rule.ExprValueUtil +//import org.apache.spark.rdd.RDD +// +//import scala.util.{Failure, Success} +// +//trait StreamingCacheDirectDataConnector extends DirectDataConnector { +// +// val cacheDataConnector: CacheDataConnector +// @transient val streamingDataConnector: StreamingDataConnector +// +// def available(): Boolean = { +// cacheDataConnector.available && streamingDataConnector.available +// } +// +// def init(): Unit = { +// cacheDataConnector.init +// +// val ds = streamingDataConnector.stream match { +// case Success(dstream) => dstream +// case Failure(ex) => throw ex +// } +// +// ds.foreachRDD((rdd, time) => { +// val ms = time.milliseconds +// +// val valueMapRdd = transform(rdd, ms) +// +// // save data frame +// cacheDataConnector.saveData(valueMapRdd, ms) +// }) +// } +// +// protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)], +// ms: Long +// ): RDD[Map[String, Any]] +// +//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala new file mode 100644 index 0000000..abc547b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala @@ -0,0 +1,136 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.connector.batch + +import org.apache.griffin.measure.config.params.user.DataConnectorParam +import org.apache.griffin.measure.process.engine.DqEngines +import org.apache.griffin.measure.utils.HdfsUtil +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.griffin.measure.utils.ParamUtil._ + +// data connector for avro file +case class TextDirBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines, dcParam: DataConnectorParam + ) extends BatchDataConnector { + + val config = dcParam.config + + val DirPath = "dir.path" + val DataDirDepth = "data.dir.depth" + val SuccessFile = "success.file" + val DoneFile = "done.file" + + val dirPath = config.getString(DirPath, "") + val dataDirDepth = config.getInt(DataDirDepth, 0) + val successFile = config.getString(SuccessFile, "_SUCCESS") + val doneFile = config.getString(DoneFile, "_DONE") + + val ignoreFilePrefix = "_" + + private def dirExist(): Boolean = { + HdfsUtil.existPath(dirPath) + } + + def data(ms: Long): Option[DataFrame] = { + try { + val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable) + // touch done file for read dirs + dataDirs.foreach(dir => touchDone(dir)) + + val validDataDirs = dataDirs.filter(dir => !emptyDir(dir)) + + if (validDataDirs.size > 0) { + val df = sqlContext.read.text(validDataDirs: _*) + val dfOpt = Some(df) + val preDfOpt = preProcess(dfOpt, ms) + preDfOpt + } else { + None + } + } catch { + case e: Throwable => { + error(s"load text dir ${dirPath} fails: ${e.getMessage}") + None + } + } + } + + private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = { + val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, "dir", true) } + if (depth <= 0) { + subDirs.filter(filteFunc) + } else { + listSubDirs(subDirs, depth, filteFunc) + } + } + + private def readable(dir: String): Boolean = isSuccess(dir) && !isDone(dir) + private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, doneFile) + private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, successFile) + + private def touchDone(dir: String): Unit = HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile)) + + private def emptyDir(dir: String): Boolean = { + HdfsUtil.listSubPathsByType(dir, "file").filter(!_.startsWith(ignoreFilePrefix)).size == 0 + } + +// def available(): Boolean = { +// (!concreteFileFullPath.isEmpty) && fileExist +// } + +// def init(): Unit = {} + +// def metaData(): Try[Iterable[(String, String)]] = { +// Try { +// val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema +// st.fields.map(f => (f.name, f.dataType.typeName)) +// } +// } + +// def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = { +// Try { +// loadDataFile.flatMap { row => +// // generate cache data +// val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap) +// val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps) +// +// // data info +// val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info => +// try { +// (info.key -> row.getAs[info.T](info.key)) +// } catch { +// case e: Throwable => info.defWrap +// } +// }.toMap +// +// finalExprValueMaps.flatMap { finalExprValueMap => +// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr => +// expr.calculate(finalExprValueMap) match { +// case Some(v) => Some(v.asInstanceOf[AnyRef]) +// case _ => None +// } +// } +// val key = toTuple(groupbyData) +// +// Some((key, (finalExprValueMap, dataInfoMap))) +// } +// } +// } +// } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala new file mode 100644 index 0000000..67dcc06 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala @@ -0,0 +1,33 @@ +///* +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. +//*/ +//package org.apache.griffin.measure.data.connector.cache +// +//import org.apache.griffin.measure.data.connector.DataConnector +//import org.apache.spark.rdd.RDD +// +//import scala.util.Try +// +//trait CacheDataConnector extends DataConnector with DataCacheable with DataUpdatable { +// +// def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit +// +// def readData(): Try[RDD[Map[String, Any]]] +// +//} +// http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala new file mode 100644 index 0000000..79162be --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala @@ -0,0 +1,86 @@ +///* +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. +//*/ +//package org.apache.griffin.measure.data.connector.cache +// +//import java.util.concurrent.atomic.AtomicLong +// +//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} +// +//trait DataCacheable { +// +// protected val defCacheInfoPath = PathCounter.genPath +// +// val cacheInfoPath: String +// val readyTimeInterval: Long +// val readyTimeDelay: Long +// +// def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}" +// +// def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath) +// def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath) +// def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath) +// def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath) +// +// protected def submitCacheTime(ms: Long): Unit = { +// val map = Map[String, String]((selfCacheTime -> ms.toString)) +// InfoCacheInstance.cacheInfo(map) +// } +// +// protected def submitReadyTime(ms: Long): Unit = { +// val curReadyTime = ms - readyTimeDelay +// if (curReadyTime % readyTimeInterval == 0) { +// val map = Map[String, String]((selfReadyTime -> curReadyTime.toString)) +// InfoCacheInstance.cacheInfo(map) +// } +// } +// +// protected def submitLastProcTime(ms: Long): Unit = { +// val map = Map[String, String]((selfLastProcTime -> ms.toString)) +// InfoCacheInstance.cacheInfo(map) +// } +// +// protected def submitCleanTime(ms: Long): Unit = { +// val cleanTime = genCleanTime(ms) +// val map = Map[String, String]((selfCleanTime -> cleanTime.toString)) +// InfoCacheInstance.cacheInfo(map) +// } +// +// protected def genCleanTime(ms: Long): Long = ms +// +// protected def readCleanTime(): Option[Long] = { +// val key = selfCleanTime +// val keys = key :: Nil +// InfoCacheInstance.readInfo(keys).get(key).flatMap { v => +// try { +// Some(v.toLong) +// } catch { +// case _ => None +// } +// } +// } +// +//} +// +//object PathCounter { +// private val counter: AtomicLong = new AtomicLong(0L) +// def genPath(): String = s"path_${increment}" +// private def increment(): Long = { +// counter.incrementAndGet() +// } +//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala new file mode 100644 index 0000000..61e8413 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala @@ -0,0 +1,30 @@ +///* +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. +//*/ +//package org.apache.griffin.measure.data.connector.cache +// +//import org.apache.spark.rdd.RDD +// +//trait DataUpdatable { +// +// def cleanOldData(): Unit = {} +// +// def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {} +// def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {} +// +//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala new file mode 100644 index 0000000..4c7b45b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala @@ -0,0 +1,351 @@ +///* +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. +//*/ +//package org.apache.griffin.measure.data.connector.cache +// +//import java.util.concurrent.TimeUnit +// +//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} +//import org.apache.griffin.measure.config.params.user.DataCacheParam +//import org.apache.griffin.measure.result.TimeStampInfo +//import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil} +//import org.apache.spark.rdd.RDD +//import org.apache.spark.sql.SQLContext +//import org.apache.spark.sql.hive.HiveContext +// +//import scala.util.{Success, Try} +// +//case class HiveCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam +// ) extends CacheDataConnector { +// +// if (!sqlContext.isInstanceOf[HiveContext]) { +// throw new Exception("hive context not prepared!") +// } +// +// val config = dataCacheParam.config +// val InfoPath = "info.path" +// val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString +// +// val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") +// val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") +// +// val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil +// val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match { +// case s :: e :: _ => { +// val ns = TimeUtil.milliseconds(s) match { +// case Some(n) if (n < 0) => n +// case _ => 0 +// } +// val ne = TimeUtil.milliseconds(e) match { +// case Some(n) if (n < 0) => n +// case _ => 0 +// } +// (ns, ne) +// } +// case _ => (0, 0) +// } +// +// val Database = "database" +// val database: String = config.getOrElse(Database, "").toString +// val TableName = "table.name" +// val tableName: String = config.get(TableName) match { +// case Some(s: String) if (s.nonEmpty) => s +// case _ => throw new Exception("invalid table.name!") +// } +// val ParentPath = "parent.path" +// val parentPath: String = config.get(ParentPath) match { +// case Some(s: String) => s +// case _ => throw new Exception("invalid parent.path!") +// } +// val tablePath = HdfsUtil.getHdfsFilePath(parentPath, tableName) +// +// val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName +// +// val ReadyTimeInterval = "ready.time.interval" +// val ReadyTimeDelay = "ready.time.delay" +// val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L) +// val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L) +// +// val TimeStampColumn: String = TimeStampInfo.key +// val PayloadColumn: String = "payload" +// +//// type Schema = (Long, String) +// val schema: List[(String, String)] = List( +// (TimeStampColumn, "bigint"), +// (PayloadColumn, "string") +// ) +// val schemaName = schema.map(_._1) +// +//// type Partition = (Long, Long) +// val partition: List[(String, String, String)] = List( +// ("hr", "bigint", "hour"), +// ("min", "bigint", "min") +// ) +// val partitionName = partition.map(_._1) +// +// private val fieldSep = """|""" +// private val rowSep = """\n""" +// private val rowSepLiteral = "\n" +// +// private def dbPrefix(): Boolean = { +// database.nonEmpty && !database.equals("default") +// } +// +// private def tableExists(): Boolean = { +// Try { +// if (dbPrefix) { +// sqlContext.tables(database).filter(tableExistsSql).collect.size +// } else { +// sqlContext.tables().filter(tableExistsSql).collect.size +// } +// } match { +// case Success(s) => s > 0 +// case _ => false +// } +// } +// +// override def init(): Unit = { +// try { +// if (tableExists) { +// // drop exist table +// val dropSql = s"""DROP TABLE ${concreteTableName}""" +// sqlContext.sql(dropSql) +// } +// +// val colsSql = schema.map { field => +// s"`${field._1}` ${field._2}" +// }.mkString(", ") +// val partitionsSql = partition.map { partition => +// s"`${partition._1}` ${partition._2}" +// }.mkString(", ") +// val sql = s"""CREATE EXTERNAL TABLE IF NOT EXISTS ${concreteTableName} +// |(${colsSql}) PARTITIONED BY (${partitionsSql}) +// |ROW FORMAT DELIMITED +// |FIELDS TERMINATED BY '${fieldSep}' +// |LINES TERMINATED BY '${rowSep}' +// |STORED AS TEXTFILE +// |LOCATION '${tablePath}'""".stripMargin +// sqlContext.sql(sql) +// } catch { +// case e: Throwable => throw e +// } +// } +// +// def available(): Boolean = { +// true +// } +// +// private def encode(data: Map[String, Any], ms: Long): Option[List[Any]] = { +// try { +// Some(schema.map { field => +// val (name, _) = field +// name match { +// case TimeStampColumn => ms +// case PayloadColumn => JsonUtil.toJson(data) +// case _ => null +// } +// }) +// } catch { +// case _ => None +// } +// } +// +// private def decode(data: List[Any], updateTimeStamp: Boolean): Option[Map[String, Any]] = { +// val dataMap = schemaName.zip(data).toMap +// dataMap.get(PayloadColumn) match { +// case Some(v: String) => { +// try { +// val map = JsonUtil.toAnyMap(v) +// val resMap = if (updateTimeStamp) { +// dataMap.get(TimeStampColumn) match { +// case Some(t) => map + (TimeStampColumn -> t) +// case _ => map +// } +// } else map +// Some(resMap) +// } catch { +// case _ => None +// } +// } +// case _ => None +// } +// } +// +// def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = { +// val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) +// if (newCacheLocked) { +// try { +// val ptns = getPartition(ms) +// val ptnsPath = genPartitionHdfsPath(ptns) +// val dirPath = s"${tablePath}/${ptnsPath}" +// val fileName = s"${ms}" +// val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName) +// +// // encode data +// val dataRdd: RDD[List[Any]] = rdd.flatMap(encode(_, ms)) +// +// // save data +// val recordRdd: RDD[String] = dataRdd.map { dt => +// dt.map(_.toString).mkString(fieldSep) +// } +// +// val dumped = if (!recordRdd.isEmpty) { +// HdfsFileDumpUtil.dump(filePath, recordRdd, rowSepLiteral) +// } else false +// +// // add partition +// if (dumped) { +// val sql = addPartitionSql(concreteTableName, ptns) +// sqlContext.sql(sql) +// } +// +// // submit ms +// submitCacheTime(ms) +// submitReadyTime(ms) +// } catch { +// case e: Throwable => error(s"save data error: ${e.getMessage}") +// } finally { +// newCacheLock.unlock() +// } +// } +// } +// +// def readData(): Try[RDD[Map[String, Any]]] = Try { +// val timeRange = TimeInfoCache.getTimeRange +// submitLastProcTime(timeRange._2) +// +// val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) +// submitCleanTime(reviseTimeRange._1) +// +// // read directly through partition info +// val partitionRange = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2) +// val sql = selectSql(concreteTableName, partitionRange) +// val df = sqlContext.sql(sql) +// +// // decode data +// df.flatMap { row => +// val dt = schemaName.map { sn => +// row.getAs[Any](sn) +// } +// decode(dt, true) +// } +// } +// +// override def cleanOldData(): Unit = { +// val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) +// if (oldCacheLocked) { +// try { +// val cleanTime = readCleanTime() +// cleanTime match { +// case Some(ct) => { +// // drop partition +// val bound = getPartition(ct) +// val sql = dropPartitionSql(concreteTableName, bound) +// sqlContext.sql(sql) +// } +// case _ => { +// // do nothing +// } +// } +// } catch { +// case e: Throwable => error(s"clean old data error: ${e.getMessage}") +// } finally { +// oldCacheLock.unlock() +// } +// } +// } +// +// override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = { +// // parallel process different time groups, lock is unnecessary +// val ptns = getPartition(t) +// val ptnsPath = genPartitionHdfsPath(ptns) +// val dirPath = s"${tablePath}/${ptnsPath}" +// val fileName = s"${t}" +// val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName) +// +// try { +// // remove out time old data +// HdfsFileDumpUtil.remove(dirPath, fileName, true) +// +// // save updated old data +// if (oldData.size > 0) { +// val recordDatas = oldData.flatMap { dt => +// encode(dt, t) +// } +// val records: Iterable[String] = recordDatas.map { dt => +// dt.map(_.toString).mkString(fieldSep) +// } +// val dumped = HdfsFileDumpUtil.dump(filePath, records, rowSepLiteral) +// } +// } catch { +// case e: Throwable => error(s"update old data error: ${e.getMessage}") +// } +// } +// +// override protected def genCleanTime(ms: Long): Long = { +// val minPartition = partition.last +// val t1 = TimeUtil.timeToUnit(ms, minPartition._3) +// val t2 = TimeUtil.timeFromUnit(t1, minPartition._3) +// t2 +// } +// +// private def getPartition(ms: Long): List[(String, Any)] = { +// partition.map { p => +// val (name, _, unit) = p +// val t = TimeUtil.timeToUnit(ms, unit) +// (name, t) +// } +// } +// private def getPartitionRange(ms1: Long, ms2: Long): List[(String, (Any, Any))] = { +// partition.map { p => +// val (name, _, unit) = p +// val t1 = TimeUtil.timeToUnit(ms1, unit) +// val t2 = TimeUtil.timeToUnit(ms2, unit) +// (name, (t1, t2)) +// } +// } +// +// private def genPartitionHdfsPath(partition: List[(String, Any)]): String = { +// partition.map(prtn => s"${prtn._1}=${prtn._2}").mkString("/") +// } +// private def addPartitionSql(tbn: String, partition: List[(String, Any)]): String = { +// val partitionSql = partition.map(ptn => (s"`${ptn._1}` = ${ptn._2}")).mkString(", ") +// val sql = s"""ALTER TABLE ${tbn} ADD IF NOT EXISTS PARTITION (${partitionSql})""" +// sql +// } +// private def selectSql(tbn: String, partitionRange: List[(String, (Any, Any))]): String = { +// val clause = partitionRange.map { pr => +// val (name, (r1, r2)) = pr +// s"""`${name}` BETWEEN '${r1}' and '${r2}'""" +// }.mkString(" AND ") +// val whereClause = if (clause.nonEmpty) s"WHERE ${clause}" else "" +// val sql = s"""SELECT * FROM ${tbn} ${whereClause}""" +// sql +// } +// private def dropPartitionSql(tbn: String, partition: List[(String, Any)]): String = { +// val partitionSql = partition.map(ptn => (s"PARTITION ( `${ptn._1}` < '${ptn._2}' ) ")).mkString(", ") +// val sql = s"""ALTER TABLE ${tbn} DROP ${partitionSql}""" +// println(sql) +// sql +// } +// +// private def tableExistsSql(): String = { +// s"tableName LIKE '${tableName}'" +// } +// +//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala new file mode 100644 index 0000000..0daf2d9 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala @@ -0,0 +1,311 @@ +///* +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. +//*/ +//package org.apache.griffin.measure.data.connector.cache +// +//import java.util.concurrent.TimeUnit +// +//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} +//import org.apache.griffin.measure.config.params.user.DataCacheParam +//import org.apache.griffin.measure.result.TimeStampInfo +//import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil} +//import org.apache.spark.rdd.RDD +//import org.apache.spark.sql.SQLContext +// +//import scala.util.Try +// +//case class TextCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam +// ) extends CacheDataConnector { +// +// val config = dataCacheParam.config +// val InfoPath = "info.path" +// val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString +// +// val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") +// val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") +// +// val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil +// val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match { +// case s :: e :: _ => { +// val ns = TimeUtil.milliseconds(s) match { +// case Some(n) if (n < 0) => n +// case _ => 0 +// } +// val ne = TimeUtil.milliseconds(e) match { +// case Some(n) if (n < 0) => n +// case _ => 0 +// } +// (ns, ne) +// } +// case _ => (0, 0) +// } +// +// val FilePath = "file.path" +// val filePath: String = config.get(FilePath) match { +// case Some(s: String) => s +// case _ => throw new Exception("invalid file.path!") +// } +// +// val ReadyTimeInterval = "ready.time.interval" +// val ReadyTimeDelay = "ready.time.delay" +// val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L) +// val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L) +// +//// val TimeStampColumn: String = TimeStampInfo.key +//// val PayloadColumn: String = "payload" +// +// // cache schema: Long, String +//// val fields = List[StructField]( +//// StructField(TimeStampColumn, LongType), +//// StructField(PayloadColumn, StringType) +//// ) +//// val schema = StructType(fields) +// +// // case class CacheData(time: Long, payload: String) { +// // def getTime(): Long = time +// // def getPayload(): String = payload +// // } +// +// private val rowSepLiteral = "\n" +// +// val partitionUnits: List[String] = List("hour", "min") +// +// override def init(): Unit = { +// // do nothing +// } +// +// def available(): Boolean = { +// true +// } +// +// private def encode(data: Map[String, Any], ms: Long): Option[String] = { +// try { +// val map = data + (TimeStampInfo.key -> ms) +// Some(JsonUtil.toJson(map)) +// } catch { +// case _: Throwable => None +// } +// } +// +// private def decode(data: String): Option[Map[String, Any]] = { +// try { +// Some(JsonUtil.toAnyMap(data)) +// } catch { +// case _: Throwable => None +// } +// } +// +// def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = { +// val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) +// if (newCacheLocked) { +// try { +// val ptns = getPartition(ms) +// val ptnsPath = genPartitionHdfsPath(ptns) +// val dirPath = s"${filePath}/${ptnsPath}" +// val dataFileName = s"${ms}" +// val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) +// +// // encode data +// val dataRdd: RDD[String] = rdd.flatMap(encode(_, ms)) +// +// // save data +// val dumped = if (!dataRdd.isEmpty) { +// HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral) +// } else false +// +// // submit ms +// submitCacheTime(ms) +// submitReadyTime(ms) +// } catch { +// case e: Throwable => error(s"save data error: ${e.getMessage}") +// } finally { +// newCacheLock.unlock() +// } +// } +// } +// +// def readData(): Try[RDD[Map[String, Any]]] = Try { +// val timeRange = TimeInfoCache.getTimeRange +// submitLastProcTime(timeRange._2) +// +// val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) +// submitCleanTime(reviseTimeRange._1) +// +// // read directly through partition info +// val partitionRanges = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2) +// println(s"read time ranges: ${reviseTimeRange}") +// println(s"read partition ranges: ${partitionRanges}") +// +// // list partition paths +// val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges) +// +// if (partitionPaths.isEmpty) { +// sqlContext.sparkContext.emptyRDD[Map[String, Any]] +// } else { +// val filePaths = partitionPaths.mkString(",") +// val rdd = sqlContext.sparkContext.textFile(filePaths) +// +// // decode data +// rdd.flatMap { row => +// decode(row) +// } +// } +// } +// +// override def cleanOldData(): Unit = { +// val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) +// if (oldCacheLocked) { +// try { +// val cleanTime = readCleanTime() +// cleanTime match { +// case Some(ct) => { +// // drop partitions +// val bounds = getPartition(ct) +// +// // list partition paths +// val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds) +// +// // delete out time data path +// earlierPaths.foreach { path => +// println(s"delete hdfs path: ${path}") +// HdfsUtil.deleteHdfsPath(path) +// } +// } +// case _ => { +// // do nothing +// } +// } +// } catch { +// case e: Throwable => error(s"clean old data error: ${e.getMessage}") +// } finally { +// oldCacheLock.unlock() +// } +// } +// } +// +// override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = { +// // parallel process different time groups, lock is unnecessary +// val ptns = getPartition(t) +// val ptnsPath = genPartitionHdfsPath(ptns) +// val dirPath = s"${filePath}/${ptnsPath}" +// val dataFileName = s"${t}" +// val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) +// +// try { +// // remove out time old data +// HdfsFileDumpUtil.remove(dirPath, dataFileName, true) +// +// // save updated old data +// if (oldData.size > 0) { +// val recordDatas = oldData.flatMap { dt => +// encode(dt, t) +// } +// val dumped = HdfsFileDumpUtil.dump(dataFilePath, recordDatas, rowSepLiteral) +// } +// } catch { +// case e: Throwable => error(s"update old data error: ${e.getMessage}") +// } +// } +// +// override protected def genCleanTime(ms: Long): Long = { +// val minPartitionUnit = partitionUnits.last +// val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit) +// val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit) +// t2 +// } +// +// private def getPartition(ms: Long): List[Long] = { +// partitionUnits.map { unit => +// TimeUtil.timeToUnit(ms, unit) +// } +// } +// private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = { +// partitionUnits.map { unit => +// val t1 = TimeUtil.timeToUnit(ms1, unit) +// val t2 = TimeUtil.timeToUnit(ms2, unit) +// (t1, t2) +// } +// } +// +// private def genPartitionHdfsPath(partition: List[Long]): String = { +// partition.map(prtn => s"${prtn}").mkString("/") +// } +// +// private def str2Long(str: String): Option[Long] = { +// try { +// Some(str.toLong) +// } catch { +// case e: Throwable => None +// } +// } +// +// // here the range means [min, max], but the best range should be (min, max] +// private def listPathsBetweenRanges(paths: List[String], +// partitionRanges: List[(Long, Long)] +// ): List[String] = { +// partitionRanges match { +// case Nil => paths +// case head :: tail => { +// val (lb, ub) = head +// val curPaths = paths.flatMap { path => +// val names = HdfsUtil.listSubPaths(path, "dir").toList +// names.filter { name => +// str2Long(name) match { +// case Some(t) => (t >= lb) && (t <= ub) +// case _ => false +// } +// }.map(HdfsUtil.getHdfsFilePath(path, _)) +// } +// listPathsBetweenRanges(curPaths, tail) +// } +// } +// } +// +// private def listPathsEarlierThanBounds(paths: List[String], bounds: List[Long] +// ): List[String] = { +// bounds match { +// case Nil => paths +// case head :: tail => { +// val earlierPaths = paths.flatMap { path => +// val names = HdfsUtil.listSubPaths(path, "dir").toList +// names.filter { name => +// str2Long(name) match { +// case Some(t) => (t < head) +// case _ => false +// } +// }.map(HdfsUtil.getHdfsFilePath(path, _)) +// } +// val equalPaths = paths.flatMap { path => +// val names = HdfsUtil.listSubPaths(path, "dir").toList +// names.filter { name => +// str2Long(name) match { +// case Some(t) => (t == head) +// case _ => false +// } +// }.map(HdfsUtil.getHdfsFilePath(path, _)) +// } +// +// tail match { +// case Nil => earlierPaths +// case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, tail) +// } +// } +// } +// } +// +//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala new file mode 100644 index 0000000..41de217 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala @@ -0,0 +1,70 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.connector.streaming + +import kafka.serializer.Decoder +import org.apache.spark.streaming.dstream.InputDStream + +import scala.util.{Failure, Success, Try} +import org.apache.griffin.measure.utils.ParamUtil._ + +trait KafkaStreamingDataConnector extends StreamingDataConnector { + + type KD <: Decoder[K] + type VD <: Decoder[V] + + val config = dcParam.config + + val KafkaConfig = "kafka.config" + val Topics = "topics" + + val kafkaConfig = config.getAnyRef(KafkaConfig, Map[String, String]()) + val topics = config.getString(Topics, "") + + def available(): Boolean = { + true + } + + def init(): Unit = { + val ds = stream match { + case Success(dstream) => dstream + case Failure(ex) => throw ex + } + ds.foreachRDD((rdd, time) => { + val ms = time.milliseconds + + val dfOpt = transform(rdd) + + val preDfOpt = preProcess(dfOpt, ms) + + // save data frame + dataSourceCacheOpt.foreach(_.saveData(preDfOpt, ms)) + }) + } + + def stream(): Try[InputDStream[(K, V)]] = Try { + val topicSet = topics.split(",").toSet + createDStream(topicSet) + } + + protected def createDStream(topicSet: Set[String]): InputDStream[(K, V)] +} + + + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala new file mode 100644 index 0000000..5e0413e --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala @@ -0,0 +1,65 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.connector.streaming + +import kafka.serializer.StringDecoder +import org.apache.griffin.measure.config.params.user.DataConnectorParam +import org.apache.griffin.measure.process.engine.DqEngines +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.InputDStream +import org.apache.spark.streaming.kafka.KafkaUtils +import org.apache.spark.sql.functions.lit + +case class KafkaStreamingStringDataConnector(sqlContext: SQLContext, + @transient ssc: StreamingContext, + dqEngines: DqEngines, + dcParam: DataConnectorParam + ) extends KafkaStreamingDataConnector { + type K = String + type KD = StringDecoder + type V = String + type VD = StringDecoder + + val valueColName = "value" + val schema = StructType(Array( + StructField(valueColName, StringType) + )) + + def createDStream(topicSet: Set[String]): InputDStream[(K, V)] = { + KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet) + } + + def transform(rdd: RDD[(K, V)]): Option[DataFrame] = { + if (rdd.isEmpty) None else { + try { + val rowRdd = rdd.map(d => Row(d._2)) + val df = sqlContext.createDataFrame(rowRdd, schema) + Some(df) + } catch { + case e: Throwable => { + error(s"streaming data transform fails") + None + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala new file mode 100644 index 0000000..cc21761 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala @@ -0,0 +1,43 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.connector.streaming + +import org.apache.griffin.measure.data.connector._ +import org.apache.griffin.measure.data.source.DataSourceCache +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame +import org.apache.spark.streaming.dstream.InputDStream + +import scala.util.Try + + +trait StreamingDataConnector extends DataConnector { + + type K + type V + + protected def stream(): Try[InputDStream[(K, V)]] + + def transform(rdd: RDD[(K, V)]): Option[DataFrame] + + def data(ms: Long): Option[DataFrame] = None + + var dataSourceCacheOpt: Option[DataSourceCache] = None + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala new file mode 100644 index 0000000..3c9106a --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala @@ -0,0 +1,76 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.source + +import java.util.concurrent.atomic.AtomicLong + +import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} + +trait DataCacheable { + + val cacheInfoPath: String + val readyTimeInterval: Long + val readyTimeDelay: Long + + def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}" + + def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath) + def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath) + def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath) + def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath) + + protected def submitCacheTime(ms: Long): Unit = { + val map = Map[String, String]((selfCacheTime -> ms.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def submitReadyTime(ms: Long): Unit = { + val curReadyTime = ms - readyTimeDelay + if (curReadyTime % readyTimeInterval == 0) { + val map = Map[String, String]((selfReadyTime -> curReadyTime.toString)) + InfoCacheInstance.cacheInfo(map) + } + } + + protected def submitLastProcTime(ms: Long): Unit = { + val map = Map[String, String]((selfLastProcTime -> ms.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def submitCleanTime(ms: Long): Unit = { + val cleanTime = genCleanTime(ms) + val map = Map[String, String]((selfCleanTime -> cleanTime.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def genCleanTime(ms: Long): Long = ms + + protected def readCleanTime(): Option[Long] = { + val key = selfCleanTime + val keys = key :: Nil + InfoCacheInstance.readInfo(keys).get(key).flatMap { v => + try { + Some(v.toLong) + } catch { + case _ => None + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala new file mode 100644 index 0000000..0927754 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala @@ -0,0 +1,109 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.source + +import org.apache.griffin.measure.data.connector._ +import org.apache.griffin.measure.data.connector.batch._ +import org.apache.griffin.measure.data.connector.streaming._ +import org.apache.griffin.measure.log.Loggable +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} + +case class DataSource(sqlContext: SQLContext, + name: String, + dataConnectors: Seq[DataConnector], + dataSourceCacheOpt: Option[DataSourceCache] + ) extends Loggable with Serializable { + + val batchDataConnectors = DataConnectorFactory.filterBatchDataConnectors(dataConnectors) + val streamingDataConnectors = DataConnectorFactory.filterStreamingDataConnectors(dataConnectors) + streamingDataConnectors.foreach(_.dataSourceCacheOpt = dataSourceCacheOpt) + + def init(): Unit = { + dataSourceCacheOpt.foreach(_.init) + dataConnectors.foreach(_.init) + } + + def loadData(ms: Long): Unit = { + data(ms) match { + case Some(df) => { + df.registerTempTable(name) + } + case None => { +// val df = sqlContext.emptyDataFrame +// df.registerTempTable(name) + warn(s"load data source [${name}] fails") +// throw new Exception(s"load data source [${name}] fails") + } + } + } + + def dropTable(): Unit = { + try { + sqlContext.dropTempTable(name) + } catch { + case e: Throwable => warn(s"drop table [${name}] fails") + } + } + + private def data(ms: Long): Option[DataFrame] = { + val batchDataFrameOpt = batchDataConnectors.flatMap { dc => + dc.data(ms) + }.reduceOption((a, b) => unionDataFrames(a, b)) + + val cacheDataFrameOpt = dataSourceCacheOpt.flatMap(_.readData()) + + (batchDataFrameOpt, cacheDataFrameOpt) match { + case (Some(bdf), Some(cdf)) => Some(unionDataFrames(bdf, cdf)) + case (Some(bdf), _) => Some(bdf) + case (_, Some(cdf)) => Some(cdf) + case _ => None + } + } + + private def unionDataFrames(df1: DataFrame, df2: DataFrame): DataFrame = { + try { + val cols = df1.columns + val rdd2 = df2.map{ row => + val values = cols.map { col => + row.getAs[Any](col) + } + Row(values: _*) + } + val ndf2 = sqlContext.createDataFrame(rdd2, df1.schema) + df1 unionAll ndf2 +// df1 unionAll df2 + } catch { + case e: Throwable => df1 + } + } + + def updateData(df: DataFrame, ms: Long): Unit = { + dataSourceCacheOpt.foreach(_.updateData(df, ms)) + } + + def updateDataMap(dfMap: Map[Long, DataFrame]): Unit = { + dataSourceCacheOpt.foreach(_.updateDataMap(dfMap)) + } + + def cleanOldData(): Unit = { + dataSourceCacheOpt.foreach(_.cleanOldData) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala new file mode 100644 index 0000000..769550f --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala @@ -0,0 +1,347 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.source + +import java.util.concurrent.TimeUnit + +import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} +import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector +import org.apache.griffin.measure.data.connector._ +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SQLContext} + +import scala.util.{Failure, Success} +import org.apache.griffin.measure.utils.ParamUtil._ + +case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any], + metricName: String, index: Int + ) extends DataCacheable with Loggable with Serializable { + + val name = "" + + val _FilePath = "file.path" + val _InfoPath = "info.path" + val _ReadyTimeInterval = "ready.time.interval" + val _ReadyTimeDelay = "ready.time.delay" + val _TimeRange = "time.range" + + val defFilePath = s"hdfs:///griffin/cache/${metricName}/${index}" + val defInfoPath = s"${index}" + + val filePath: String = param.getString(_FilePath, defFilePath) + val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath) + val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L) + val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L) + val deltaTimeRange: (Long, Long) = { + def negative(n: Long): Long = if (n <= 0) n else 0 + param.get(_TimeRange) match { + case Some(seq: Seq[String]) => { + val nseq = seq.flatMap(TimeUtil.milliseconds(_)) + val ns = negative(nseq.headOption.getOrElse(0)) + val ne = negative(nseq.tail.headOption.getOrElse(0)) + (ns, ne) + } + case _ => (0, 0) + } + } + + val rowSepLiteral = "\n" + val partitionUnits: List[String] = List("hour", "min", "sec") + + val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") + val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") + + def init(): Unit = { + ; + } + + def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = { + dfOpt match { + case Some(df) => { + val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) + if (newCacheLocked) { + try { + val ptns = getPartition(ms) + val ptnsPath = genPartitionHdfsPath(ptns) + val dirPath = s"${filePath}/${ptnsPath}" + val dataFileName = s"${ms}" + val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) + + // transform data + val dataRdd: RDD[String] = df.toJSON + + // save data + val dumped = if (!dataRdd.isEmpty) { + HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral) + } else false + + } catch { + case e: Throwable => error(s"save data error: ${e.getMessage}") + } finally { + newCacheLock.unlock() + } + } + } + case _ => { + info(s"no data frame to save") + } + } + + // submit cache time and ready time + submitCacheTime(ms) + submitReadyTime(ms) + } + + def readData(): Option[DataFrame] = { + val timeRange = TimeInfoCache.getTimeRange + submitLastProcTime(timeRange._2) + + val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) + submitCleanTime(reviseTimeRange._1) + + // read directly through partition info + val partitionRanges = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2) + println(s"read time ranges: ${reviseTimeRange}") + println(s"read partition ranges: ${partitionRanges}") + + // list partition paths + val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges) + + if (partitionPaths.isEmpty) { + None + } else { + try { + Some(sqlContext.read.json(partitionPaths: _*)) + } catch { + case e: Throwable => { + warn(s"read data source cache warn: ${e.getMessage}") + None + } + } + } + } + + // -- deprecated -- + def updateData(df: DataFrame, ms: Long): Unit = { + val ptns = getPartition(ms) + val ptnsPath = genPartitionHdfsPath(ptns) + val dirPath = s"${filePath}/${ptnsPath}" + val dataFileName = s"${ms}" + val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) + + try { + val records = df.toJSON + val arr = records.collect + val needSave = !arr.isEmpty + + // remove out time old data + HdfsFileDumpUtil.remove(dirPath, dataFileName, true) + println(s"remove file path: ${dirPath}/${dataFileName}") + + // save updated data + val dumped = if (needSave) { + HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral) + println(s"update file path: ${dataFilePath}") + } else false + } catch { + case e: Throwable => error(s"update data error: ${e.getMessage}") + } + } + + def updateData(rdd: RDD[String], ms: Long, cnt: Long): Unit = { + val ptns = getPartition(ms) + val ptnsPath = genPartitionHdfsPath(ptns) + val dirPath = s"${filePath}/${ptnsPath}" + val dataFileName = s"${ms}" + val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) + + try { +// val needSave = !rdd.isEmpty + + // remove out time old data + HdfsFileDumpUtil.remove(dirPath, dataFileName, true) + println(s"remove file path: ${dirPath}/${dataFileName}") + + // save updated data + val dumped = if (cnt > 0) { + HdfsFileDumpUtil.dump(dataFilePath, rdd, rowSepLiteral) + println(s"update file path: ${dataFilePath}") + } else false + } catch { + case e: Throwable => error(s"update data error: ${e.getMessage}") + } finally { + rdd.unpersist() + } + } + + def updateData(rdd: Iterable[String], ms: Long): Unit = { + val ptns = getPartition(ms) + val ptnsPath = genPartitionHdfsPath(ptns) + val dirPath = s"${filePath}/${ptnsPath}" + val dataFileName = s"${ms}" + val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) + + try { + val needSave = !rdd.isEmpty + + // remove out time old data + HdfsFileDumpUtil.remove(dirPath, dataFileName, true) + println(s"remove file path: ${dirPath}/${dataFileName}") + + // save updated data + val dumped = if (needSave) { + HdfsFileDumpUtil.dump(dataFilePath, rdd, rowSepLiteral) + println(s"update file path: ${dataFilePath}") + } else false + } catch { + case e: Throwable => error(s"update data error: ${e.getMessage}") + } + } + + def updateDataMap(dfMap: Map[Long, DataFrame]): Unit = { + val dataMap = dfMap.map { pair => + val (t, recs) = pair + val rdd = recs.toJSON +// rdd.cache + (t, rdd, rdd.count) + } + + dataMap.foreach { pair => + val (t, arr, cnt) = pair + updateData(arr, t, cnt) + } + } + + def cleanOldData(): Unit = { + val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) + if (oldCacheLocked) { + try { + val cleanTime = readCleanTime() + cleanTime match { + case Some(ct) => { + // drop partitions + val bounds = getPartition(ct) + + // list partition paths + val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds) + + // delete out time data path + earlierPaths.foreach { path => + println(s"delete hdfs path: ${path}") + HdfsUtil.deleteHdfsPath(path) + } + } + case _ => { + // do nothing + } + } + } catch { + case e: Throwable => error(s"clean old data error: ${e.getMessage}") + } finally { + oldCacheLock.unlock() + } + } + } + + override protected def genCleanTime(ms: Long): Long = { + val minPartitionUnit = partitionUnits.last + val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit) + val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit) + t2 + } + + private def getPartition(ms: Long): List[Long] = { + partitionUnits.map { unit => + TimeUtil.timeToUnit(ms, unit) + } + } + private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = { + partitionUnits.map { unit => + val t1 = TimeUtil.timeToUnit(ms1, unit) + val t2 = TimeUtil.timeToUnit(ms2, unit) + (t1, t2) + } + } + private def genPartitionHdfsPath(partition: List[Long]): String = { + partition.map(prtn => s"${prtn}").mkString("/") + } + private def str2Long(str: String): Option[Long] = { + try { + Some(str.toLong) + } catch { + case e: Throwable => None + } + } + + + // here the range means [min, max], but the best range should be (min, max] + private def listPathsBetweenRanges(paths: List[String], + partitionRanges: List[(Long, Long)] + ): List[String] = { + partitionRanges match { + case Nil => paths + case head :: tail => { + val (lb, ub) = head + val curPaths = paths.flatMap { path => + val names = HdfsUtil.listSubPathsByType(path, "dir").toList + names.filter { name => + str2Long(name) match { + case Some(t) => (t >= lb) && (t <= ub) + case _ => false + } + }.map(HdfsUtil.getHdfsFilePath(path, _)) + } + listPathsBetweenRanges(curPaths, tail) + } + } + } + private def listPathsEarlierThanBounds(paths: List[String], bounds: List[Long] + ): List[String] = { + bounds match { + case Nil => paths + case head :: tail => { + val earlierPaths = paths.flatMap { path => + val names = HdfsUtil.listSubPathsByType(path, "dir").toList + names.filter { name => + str2Long(name) match { + case Some(t) => (t < head) + case _ => false + } + }.map(HdfsUtil.getHdfsFilePath(path, _)) + } + val equalPaths = paths.flatMap { path => + val names = HdfsUtil.listSubPathsByType(path, "dir").toList + names.filter { name => + str2Long(name) match { + case Some(t) => (t == head) + case _ => false + } + }.map(HdfsUtil.getHdfsFilePath(path, _)) + } + + tail match { + case Nil => earlierPaths + case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, tail) + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala new file mode 100644 index 0000000..6c1b76e --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala @@ -0,0 +1,80 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.source + +import org.apache.griffin.measure.config.params.user._ +import org.apache.griffin.measure.data.connector.batch.BatchDataConnector +import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector +import org.apache.griffin.measure.data.connector.{DataConnector, DataConnectorFactory} +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.process.engine.{DqEngine, DqEngines} +import org.apache.spark.sql.SQLContext +import org.apache.spark.streaming.StreamingContext + +import scala.util.{Success, Try} + +object DataSourceFactory extends Loggable { + + val HiveRegex = """^(?i)hive$""".r + val TextRegex = """^(?i)text$""".r + val AvroRegex = """^(?i)avro$""".r + + def genDataSources(sqlContext: SQLContext, ssc: StreamingContext, dqEngines: DqEngines, + dataSourceParams: Seq[DataSourceParam], metricName: String): Seq[DataSource] = { + dataSourceParams.zipWithIndex.flatMap { pair => + val (param, index) = pair + genDataSource(sqlContext, ssc, dqEngines, param, metricName, index) + } + } + + private def genDataSource(sqlContext: SQLContext, ssc: StreamingContext, + dqEngines: DqEngines, + dataSourceParam: DataSourceParam, + metricName: String, index: Int + ): Option[DataSource] = { + val name = dataSourceParam.name + val connectorParams = dataSourceParam.connectors + val cacheParam = dataSourceParam.cache + val dataConnectors = connectorParams.flatMap { connectorParam => + DataConnectorFactory.getDataConnector(sqlContext, ssc, dqEngines, connectorParam) match { + case Success(connector) => Some(connector) + case _ => None + } + } + val dataSourceCacheOpt = genDataSourceCache(sqlContext, cacheParam, metricName, index) + + Some(DataSource(sqlContext, name, dataConnectors, dataSourceCacheOpt)) + } + + private def genDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], + metricName: String, index: Int + ) = { + if (param != null) { + try { + Some(DataSourceCache(sqlContext, param, metricName, index)) + } catch { + case e: Throwable => { + error(s"generate data source cache fails") + None + } + } + } else None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala index 97786c4..431fe10 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala @@ -21,10 +21,12 @@ package org.apache.griffin.measure.persist import java.util.Date import org.apache.griffin.measure.result._ -import org.apache.griffin.measure.utils.HdfsUtil +import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame import scala.util.Try +import org.apache.griffin.measure.utils.ParamUtil._ // persist result and data to hdfs case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { @@ -34,17 +36,17 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: val MaxLinesPerFile = "max.lines.per.file" val path = config.getOrElse(Path, "").toString - val maxPersistLines = try { config.getOrElse(MaxPersistLines, -1).toString.toInt } catch { case _ => -1 } - val maxLinesPerFile = try { config.getOrElse(MaxLinesPerFile, 10000).toString.toLong } catch { case _ => 10000 } + val maxPersistLines = config.getInt(MaxPersistLines, -1) + val maxLinesPerFile = config.getLong(MaxLinesPerFile, 10000) val separator = "/" val StartFile = filePath("_START") val FinishFile = filePath("_FINISH") - val ResultFile = filePath("_RESULT") + val MetricsFile = filePath("_METRICS") - val MissRecFile = filePath("_MISSREC") // optional - val MatchRecFile = filePath("_MATCHREC") // optional +// val MissRecFile = filePath("_MISSREC") // optional +// val MatchRecFile = filePath("_MATCHREC") // optional val LogFile = filePath("_LOG") @@ -56,7 +58,7 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: } def available(): Boolean = { - (path.nonEmpty) && (maxPersistLines < Int.MaxValue) + path.nonEmpty } private def persistHead: String = { @@ -92,57 +94,141 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: } } - def result(rt: Long, result: Result): Unit = { - try { - val resStr = result match { - case ar: AccuracyResult => { - s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}" - } - case pr: ProfileResult => { - s"match percentage: ${pr.matchPercentage}\ntotal count: ${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}" - } - case _ => { - s"result: ${result}" - } - } - HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr) - log(rt, resStr) +// def result(rt: Long, result: Result): Unit = { +// try { +// val resStr = result match { +// case ar: AccuracyResult => { +// s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}" +// } +// case pr: ProfileResult => { +// s"match percentage: ${pr.matchPercentage}\ntotal count: ${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}" +// } +// case _ => { +// s"result: ${result}" +// } +// } +// HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr) +// log(rt, resStr) +// +// info(resStr) +// } catch { +// case e: Throwable => error(e.getMessage) +// } +// } - info(resStr) - } catch { - case e: Throwable => error(e.getMessage) - } + // need to avoid string too long +// private def rddRecords(records: RDD[String], path: String): Unit = { +// try { +// val recordCount = records.count +// val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) +// if (count > 0) { +// val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt +// if (groupCount <= 1) { +// val recs = records.take(count.toInt) +// persistRecords(path, recs) +// } else { +// val groupedRecords: RDD[(Long, Iterable[String])] = +// records.zipWithIndex.flatMap { r => +// val gid = r._2 / maxLinesPerFile +// if (gid < groupCount) Some((gid, r._1)) else None +// }.groupByKey() +// groupedRecords.foreach { group => +// val (gid, recs) = group +// val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) +// persistRecords(hdfsPath, recs) +// } +// } +// } +// } catch { +// case e: Throwable => error(e.getMessage) +// } +// } +// +// private def iterableRecords(records: Iterable[String], path: String): Unit = { +// try { +// val recordCount = records.size +// val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) +// if (count > 0) { +// val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt +// if (groupCount <= 1) { +// val recs = records.take(count.toInt) +// persistRecords(path, recs) +// } else { +// val groupedRecords = records.grouped(groupCount).zipWithIndex +// groupedRecords.take(groupCount).foreach { group => +// val (recs, gid) = group +// val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) +// persistRecords(hdfsPath, recs) +// } +// } +// } +// } catch { +// case e: Throwable => error(e.getMessage) +// } +// } +// +// def records(recs: RDD[String], tp: String): Unit = { +// tp match { +// case PersistDataType.MISS => rddRecords(recs, MissRecFile) +// case PersistDataType.MATCH => rddRecords(recs, MatchRecFile) +// case _ => {} +// } +// } +// +// def records(recs: Iterable[String], tp: String): Unit = { +// tp match { +// case PersistDataType.MISS => iterableRecords(recs, MissRecFile) +// case PersistDataType.MATCH => iterableRecords(recs, MatchRecFile) +// case _ => {} +// } +// } + + private def persistRecords(hdfsPath: String, records: Iterable[String]): Unit = { + val recStr = records.mkString("\n") + HdfsUtil.writeContent(hdfsPath, recStr) } - // need to avoid string too long - private def rddRecords(records: RDD[String], path: String): Unit = { + def log(rt: Long, msg: String): Unit = { try { - val recordCount = records.count - val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) - if (count > 0) { - val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt - if (groupCount <= 1) { - val recs = records.take(count.toInt) - persistRecords(path, recs) - } else { - val groupedRecords: RDD[(Long, Iterable[String])] = - records.zipWithIndex.flatMap { r => - val gid = r._2 / maxLinesPerFile - if (gid < groupCount) Some((gid, r._1)) else None - }.groupByKey() - groupedRecords.foreach { group => - val (gid, recs) = group - val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) - persistRecords(hdfsPath, recs) - } - } - } + val logStr = (if (isInit) persistHead else "") + timeHead(rt) + s"${msg}\n\n" + HdfsUtil.appendContent(LogFile, logStr) } catch { case e: Throwable => error(e.getMessage) } } - private def iterableRecords(records: Iterable[String], path: String): Unit = { + +// def persistRecords(df: DataFrame, name: String): Unit = { +// val records = df.toJSON +// val path = filePath(name) +// try { +// val recordCount = records.count +// val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) +// if (count > 0) { +// val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt +// if (groupCount <= 1) { +// val recs = records.take(count.toInt) +// persistRecords(path, recs) +// } else { +// val groupedRecords: RDD[(Long, Iterable[String])] = +// records.zipWithIndex.flatMap { r => +// val gid = r._2 / maxLinesPerFile +// if (gid < groupCount) Some((gid, r._1)) else None +// }.groupByKey() +// groupedRecords.foreach { group => +// val (gid, recs) = group +// val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) +// persistRecords(hdfsPath, recs) +// } +// } +// } +// } catch { +// case e: Throwable => error(e.getMessage) +// } +// } + + def persistRecords(records: Iterable[String], name: String): Unit = { + val path = filePath(name) try { val recordCount = records.size val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) @@ -165,39 +251,35 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: } } - def records(recs: RDD[String], tp: String): Unit = { - tp match { - case PersistType.MISS => rddRecords(recs, MissRecFile) - case PersistType.MATCH => rddRecords(recs, MatchRecFile) - case _ => {} - } - } - - def records(recs: Iterable[String], tp: String): Unit = { - tp match { - case PersistType.MISS => iterableRecords(recs, MissRecFile) - case PersistType.MATCH => iterableRecords(recs, MatchRecFile) - case _ => {} - } - } - -// def missRecords(records: RDD[String]): Unit = { -// rddRecords(records, MissRecFile) +// def persistMetrics(metrics: Seq[String], name: String): Unit = { +// val path = filePath(name) +// try { +// val recordCount = metrics.size +// val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) +// if (count > 0) { +// val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt +// if (groupCount <= 1) { +// val recs = metrics.take(count.toInt) +// persistRecords(path, recs) +// } else { +// val groupedRecords = metrics.grouped(groupCount).zipWithIndex +// groupedRecords.take(groupCount).foreach { group => +// val (recs, gid) = group +// val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) +// persistRecords(hdfsPath, recs) +// } +// } +// } +// } catch { +// case e: Throwable => error(e.getMessage) +// } // } -// -// def matchRecords(records: RDD[String]): Unit = { -// rddRecords(records, MatchRecFile) -// } - - private def persistRecords(hdfsPath: String, records: Iterable[String]): Unit = { - val recStr = records.mkString("\n") - HdfsUtil.writeContent(hdfsPath, recStr) - } - def log(rt: Long, msg: String): Unit = { + def persistMetrics(metrics: Map[String, Any]): Unit = { + val json = JsonUtil.toJson(metrics) try { - val logStr = (if (isInit) persistHead else "") + timeHead(rt) + s"${msg}\n\n" - HdfsUtil.appendContent(LogFile, logStr) + info(s"${json}") + persistRecords(MetricsFile, json :: Nil) } catch { case e: Throwable => error(e.getMessage) }
