remove some unused files Author: Lionel Liu <[email protected]>
Closes #152 from bhlx3lyx7/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/1f984da1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/1f984da1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/1f984da1 Branch: refs/heads/master Commit: 1f984da1aea86e8be507db37f426b5e28d0d81e8 Parents: 0dd1d35 Author: Lionel Liu <[email protected]> Authored: Mon Oct 30 16:08:14 2017 +0800 Committer: Lionel Liu <[email protected]> Committed: Mon Oct 30 16:08:14 2017 +0800 ---------------------------------------------------------------------- .../batch/KafkaCacheDirectDataConnector.scala | 125 ----- .../StreamingCacheDirectDataConnector.scala | 60 --- .../connector/cache/CacheDataConnector.scala | 33 -- .../data/connector/cache/DataCacheable.scala | 86 --- .../data/connector/cache/DataUpdatable.scala | 30 -- .../cache/HiveCacheDataConnector.scala | 351 ------------ .../cache/TextCacheDataConnector.scala | 311 ----------- .../measure/persist/OldHttpPersist.scala | 87 --- .../apache/griffin/measure/process/Algo.scala | 34 -- measure/src/test/resources/test-data.jsonFile | 3 - measure/src/test/resources/test-data1.jsonFile | 31 -- .../measure/cache/InfoCacheInstanceTest.scala | 78 --- .../griffin/measure/cache/ZKCacheLockTest.scala | 84 --- .../griffin/measure/cache/ZKInfoCacheTest.scala | 90 ---- .../measure/process/BatchProcessTest.scala | 146 ----- .../griffin/measure/process/JsonParseTest.scala | 531 ------------------- .../griffin/measure/process/JsonToStructs.scala | 85 --- .../measure/process/StreamingProcessTest.scala | 147 ----- .../apache/griffin/measure/sql/SqlTest.scala | 125 ----- .../griffin/measure/utils/JsonUtilTest.scala | 60 --- 20 files changed, 2497 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala deleted file mode 100644 index 70ddcde..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala +++ /dev/null @@ -1,125 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.data.connector.direct -// -//import org.apache.griffin.measure.config.params.user.DataConnectorParam -//import org.apache.griffin.measure.data.connector.DataConnectorFactory -//import org.apache.griffin.measure.data.connector.cache.CacheDataConnector -//import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector -//import org.apache.griffin.measure.result._ -//import org.apache.griffin.measure.rule._ -//import org.apache.spark.rdd.RDD -//import org.apache.spark.sql.SQLContext -//import org.apache.spark.streaming.StreamingContext -// -//import scala.util.{Failure, Success, Try} -// -//case class KafkaCacheDirectDataConnector(@transient streamingDataConnectorTry: Try[StreamingDataConnector], -// cacheDataConnectorTry: Try[CacheDataConnector], -// dataConnectorParam: DataConnectorParam, -// ruleExprs: RuleExprs, -// constFinalExprValueMap: Map[String, Any] -// ) extends StreamingCacheDirectDataConnector { -// -// val cacheDataConnector: CacheDataConnector = cacheDataConnectorTry match { -// case Success(cntr) => cntr -// case Failure(ex) => throw ex -// } -// @transient val streamingDataConnector: StreamingDataConnector = streamingDataConnectorTry match { -// case Success(cntr) => cntr -// case Failure(ex) => throw ex -// } -// -// protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)], -// ms: Long -// ): RDD[Map[String, Any]] = { -// val dataInfoMap = DataInfo.cacheInfoList.map(_.defWrap).toMap + TimeStampInfo.wrap(ms) -// -// rdd.flatMap { kv => -// val msg = kv._2 -// -// val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(msg), ruleExprs.cacheExprs, constFinalExprValueMap) -// val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps) -// -// finalExprValueMaps.map { vm => -// vm ++ dataInfoMap -// } -// } -// } -// -// def metaData(): Try[Iterable[(String, String)]] = Try { -// Map.empty[String, String] -// } -// -// def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = Try { -// cacheDataConnector.readData match { -// case Success(rdd) => { -// rdd.flatMap { row => -// val finalExprValueMap = ruleExprs.finalCacheExprs.flatMap { expr => -// row.get(expr._id).flatMap { d => -// Some((expr._id, d)) -// } -// }.toMap -// -// val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info => -// row.get(info.key) match { -// case Some(d) => (info.key -> d) -// case _ => info.defWrap -// } -// }.toMap -// -// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr => -// expr.calculate(finalExprValueMap) match { -// case Some(v) => Some(v.asInstanceOf[AnyRef]) -// case _ => None -// } -// } -// val key = toTuple(groupbyData) -// -// Some((key, (finalExprValueMap, dataInfoMap))) -// } -// } -// case Failure(ex) => throw ex -// } -// } -// -// override def cleanOldData(): Unit = { -// cacheDataConnector.cleanOldData -// } -// -// override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = { -// if (dataConnectorParam.getMatchOnce) { -// cacheDataConnector.updateOldData(t, oldData) -// } -// } -// -// override def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = { -// if (dataConnectorParam.getMatchOnce) { -// cacheDataConnector.updateAllOldData(oldRdd) -// } -// } -// -// private def toTuple[A <: AnyRef](as: Seq[A]): Product = { -// if (as.size > 0) { -// val tupleClass = Class.forName("scala.Tuple" + as.size) -// tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product] -// } else None -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala deleted file mode 100644 index dddf430..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala +++ /dev/null @@ -1,60 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.data.connector.direct -// -//import org.apache.griffin.measure.data.connector.cache.CacheDataConnector -//import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector -//import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo} -//import org.apache.griffin.measure.rule.ExprValueUtil -//import org.apache.spark.rdd.RDD -// -//import scala.util.{Failure, Success} -// -//trait StreamingCacheDirectDataConnector extends DirectDataConnector { -// -// val cacheDataConnector: CacheDataConnector -// @transient val streamingDataConnector: StreamingDataConnector -// -// def available(): Boolean = { -// cacheDataConnector.available && streamingDataConnector.available -// } -// -// def init(): Unit = { -// cacheDataConnector.init -// -// val ds = streamingDataConnector.stream match { -// case Success(dstream) => dstream -// case Failure(ex) => throw ex -// } -// -// ds.foreachRDD((rdd, time) => { -// val ms = time.milliseconds -// -// val valueMapRdd = transform(rdd, ms) -// -// // save data frame -// cacheDataConnector.saveData(valueMapRdd, ms) -// }) -// } -// -// protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)], -// ms: Long -// ): RDD[Map[String, Any]] -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala deleted file mode 100644 index 67dcc06..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala +++ /dev/null @@ -1,33 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.data.connector.cache -// -//import org.apache.griffin.measure.data.connector.DataConnector -//import org.apache.spark.rdd.RDD -// -//import scala.util.Try -// -//trait CacheDataConnector extends DataConnector with DataCacheable with DataUpdatable { -// -// def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit -// -// def readData(): Try[RDD[Map[String, Any]]] -// -//} -// http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala deleted file mode 100644 index 79162be..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala +++ /dev/null @@ -1,86 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.data.connector.cache -// -//import java.util.concurrent.atomic.AtomicLong -// -//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} -// -//trait DataCacheable { -// -// protected val defCacheInfoPath = PathCounter.genPath -// -// val cacheInfoPath: String -// val readyTimeInterval: Long -// val readyTimeDelay: Long -// -// def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}" -// -// def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath) -// def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath) -// def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath) -// def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath) -// -// protected def submitCacheTime(ms: Long): Unit = { -// val map = Map[String, String]((selfCacheTime -> ms.toString)) -// InfoCacheInstance.cacheInfo(map) -// } -// -// protected def submitReadyTime(ms: Long): Unit = { -// val curReadyTime = ms - readyTimeDelay -// if (curReadyTime % readyTimeInterval == 0) { -// val map = Map[String, String]((selfReadyTime -> curReadyTime.toString)) -// InfoCacheInstance.cacheInfo(map) -// } -// } -// -// protected def submitLastProcTime(ms: Long): Unit = { -// val map = Map[String, String]((selfLastProcTime -> ms.toString)) -// InfoCacheInstance.cacheInfo(map) -// } -// -// protected def submitCleanTime(ms: Long): Unit = { -// val cleanTime = genCleanTime(ms) -// val map = Map[String, String]((selfCleanTime -> cleanTime.toString)) -// InfoCacheInstance.cacheInfo(map) -// } -// -// protected def genCleanTime(ms: Long): Long = ms -// -// protected def readCleanTime(): Option[Long] = { -// val key = selfCleanTime -// val keys = key :: Nil -// InfoCacheInstance.readInfo(keys).get(key).flatMap { v => -// try { -// Some(v.toLong) -// } catch { -// case _ => None -// } -// } -// } -// -//} -// -//object PathCounter { -// private val counter: AtomicLong = new AtomicLong(0L) -// def genPath(): String = s"path_${increment}" -// private def increment(): Long = { -// counter.incrementAndGet() -// } -//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala deleted file mode 100644 index 61e8413..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala +++ /dev/null @@ -1,30 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.data.connector.cache -// -//import org.apache.spark.rdd.RDD -// -//trait DataUpdatable { -// -// def cleanOldData(): Unit = {} -// -// def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {} -// def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {} -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala deleted file mode 100644 index 4c7b45b..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala +++ /dev/null @@ -1,351 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.data.connector.cache -// -//import java.util.concurrent.TimeUnit -// -//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} -//import org.apache.griffin.measure.config.params.user.DataCacheParam -//import org.apache.griffin.measure.result.TimeStampInfo -//import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil} -//import org.apache.spark.rdd.RDD -//import org.apache.spark.sql.SQLContext -//import org.apache.spark.sql.hive.HiveContext -// -//import scala.util.{Success, Try} -// -//case class HiveCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam -// ) extends CacheDataConnector { -// -// if (!sqlContext.isInstanceOf[HiveContext]) { -// throw new Exception("hive context not prepared!") -// } -// -// val config = dataCacheParam.config -// val InfoPath = "info.path" -// val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString -// -// val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") -// val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") -// -// val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil -// val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match { -// case s :: e :: _ => { -// val ns = TimeUtil.milliseconds(s) match { -// case Some(n) if (n < 0) => n -// case _ => 0 -// } -// val ne = TimeUtil.milliseconds(e) match { -// case Some(n) if (n < 0) => n -// case _ => 0 -// } -// (ns, ne) -// } -// case _ => (0, 0) -// } -// -// val Database = "database" -// val database: String = config.getOrElse(Database, "").toString -// val TableName = "table.name" -// val tableName: String = config.get(TableName) match { -// case Some(s: String) if (s.nonEmpty) => s -// case _ => throw new Exception("invalid table.name!") -// } -// val ParentPath = "parent.path" -// val parentPath: String = config.get(ParentPath) match { -// case Some(s: String) => s -// case _ => throw new Exception("invalid parent.path!") -// } -// val tablePath = HdfsUtil.getHdfsFilePath(parentPath, tableName) -// -// val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName -// -// val ReadyTimeInterval = "ready.time.interval" -// val ReadyTimeDelay = "ready.time.delay" -// val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L) -// val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L) -// -// val TimeStampColumn: String = TimeStampInfo.key -// val PayloadColumn: String = "payload" -// -//// type Schema = (Long, String) -// val schema: List[(String, String)] = List( -// (TimeStampColumn, "bigint"), -// (PayloadColumn, "string") -// ) -// val schemaName = schema.map(_._1) -// -//// type Partition = (Long, Long) -// val partition: List[(String, String, String)] = List( -// ("hr", "bigint", "hour"), -// ("min", "bigint", "min") -// ) -// val partitionName = partition.map(_._1) -// -// private val fieldSep = """|""" -// private val rowSep = """\n""" -// private val rowSepLiteral = "\n" -// -// private def dbPrefix(): Boolean = { -// database.nonEmpty && !database.equals("default") -// } -// -// private def tableExists(): Boolean = { -// Try { -// if (dbPrefix) { -// sqlContext.tables(database).filter(tableExistsSql).collect.size -// } else { -// sqlContext.tables().filter(tableExistsSql).collect.size -// } -// } match { -// case Success(s) => s > 0 -// case _ => false -// } -// } -// -// override def init(): Unit = { -// try { -// if (tableExists) { -// // drop exist table -// val dropSql = s"""DROP TABLE ${concreteTableName}""" -// sqlContext.sql(dropSql) -// } -// -// val colsSql = schema.map { field => -// s"`${field._1}` ${field._2}" -// }.mkString(", ") -// val partitionsSql = partition.map { partition => -// s"`${partition._1}` ${partition._2}" -// }.mkString(", ") -// val sql = s"""CREATE EXTERNAL TABLE IF NOT EXISTS ${concreteTableName} -// |(${colsSql}) PARTITIONED BY (${partitionsSql}) -// |ROW FORMAT DELIMITED -// |FIELDS TERMINATED BY '${fieldSep}' -// |LINES TERMINATED BY '${rowSep}' -// |STORED AS TEXTFILE -// |LOCATION '${tablePath}'""".stripMargin -// sqlContext.sql(sql) -// } catch { -// case e: Throwable => throw e -// } -// } -// -// def available(): Boolean = { -// true -// } -// -// private def encode(data: Map[String, Any], ms: Long): Option[List[Any]] = { -// try { -// Some(schema.map { field => -// val (name, _) = field -// name match { -// case TimeStampColumn => ms -// case PayloadColumn => JsonUtil.toJson(data) -// case _ => null -// } -// }) -// } catch { -// case _ => None -// } -// } -// -// private def decode(data: List[Any], updateTimeStamp: Boolean): Option[Map[String, Any]] = { -// val dataMap = schemaName.zip(data).toMap -// dataMap.get(PayloadColumn) match { -// case Some(v: String) => { -// try { -// val map = JsonUtil.toAnyMap(v) -// val resMap = if (updateTimeStamp) { -// dataMap.get(TimeStampColumn) match { -// case Some(t) => map + (TimeStampColumn -> t) -// case _ => map -// } -// } else map -// Some(resMap) -// } catch { -// case _ => None -// } -// } -// case _ => None -// } -// } -// -// def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = { -// val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) -// if (newCacheLocked) { -// try { -// val ptns = getPartition(ms) -// val ptnsPath = genPartitionHdfsPath(ptns) -// val dirPath = s"${tablePath}/${ptnsPath}" -// val fileName = s"${ms}" -// val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName) -// -// // encode data -// val dataRdd: RDD[List[Any]] = rdd.flatMap(encode(_, ms)) -// -// // save data -// val recordRdd: RDD[String] = dataRdd.map { dt => -// dt.map(_.toString).mkString(fieldSep) -// } -// -// val dumped = if (!recordRdd.isEmpty) { -// HdfsFileDumpUtil.dump(filePath, recordRdd, rowSepLiteral) -// } else false -// -// // add partition -// if (dumped) { -// val sql = addPartitionSql(concreteTableName, ptns) -// sqlContext.sql(sql) -// } -// -// // submit ms -// submitCacheTime(ms) -// submitReadyTime(ms) -// } catch { -// case e: Throwable => error(s"save data error: ${e.getMessage}") -// } finally { -// newCacheLock.unlock() -// } -// } -// } -// -// def readData(): Try[RDD[Map[String, Any]]] = Try { -// val timeRange = TimeInfoCache.getTimeRange -// submitLastProcTime(timeRange._2) -// -// val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) -// submitCleanTime(reviseTimeRange._1) -// -// // read directly through partition info -// val partitionRange = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2) -// val sql = selectSql(concreteTableName, partitionRange) -// val df = sqlContext.sql(sql) -// -// // decode data -// df.flatMap { row => -// val dt = schemaName.map { sn => -// row.getAs[Any](sn) -// } -// decode(dt, true) -// } -// } -// -// override def cleanOldData(): Unit = { -// val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) -// if (oldCacheLocked) { -// try { -// val cleanTime = readCleanTime() -// cleanTime match { -// case Some(ct) => { -// // drop partition -// val bound = getPartition(ct) -// val sql = dropPartitionSql(concreteTableName, bound) -// sqlContext.sql(sql) -// } -// case _ => { -// // do nothing -// } -// } -// } catch { -// case e: Throwable => error(s"clean old data error: ${e.getMessage}") -// } finally { -// oldCacheLock.unlock() -// } -// } -// } -// -// override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = { -// // parallel process different time groups, lock is unnecessary -// val ptns = getPartition(t) -// val ptnsPath = genPartitionHdfsPath(ptns) -// val dirPath = s"${tablePath}/${ptnsPath}" -// val fileName = s"${t}" -// val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName) -// -// try { -// // remove out time old data -// HdfsFileDumpUtil.remove(dirPath, fileName, true) -// -// // save updated old data -// if (oldData.size > 0) { -// val recordDatas = oldData.flatMap { dt => -// encode(dt, t) -// } -// val records: Iterable[String] = recordDatas.map { dt => -// dt.map(_.toString).mkString(fieldSep) -// } -// val dumped = HdfsFileDumpUtil.dump(filePath, records, rowSepLiteral) -// } -// } catch { -// case e: Throwable => error(s"update old data error: ${e.getMessage}") -// } -// } -// -// override protected def genCleanTime(ms: Long): Long = { -// val minPartition = partition.last -// val t1 = TimeUtil.timeToUnit(ms, minPartition._3) -// val t2 = TimeUtil.timeFromUnit(t1, minPartition._3) -// t2 -// } -// -// private def getPartition(ms: Long): List[(String, Any)] = { -// partition.map { p => -// val (name, _, unit) = p -// val t = TimeUtil.timeToUnit(ms, unit) -// (name, t) -// } -// } -// private def getPartitionRange(ms1: Long, ms2: Long): List[(String, (Any, Any))] = { -// partition.map { p => -// val (name, _, unit) = p -// val t1 = TimeUtil.timeToUnit(ms1, unit) -// val t2 = TimeUtil.timeToUnit(ms2, unit) -// (name, (t1, t2)) -// } -// } -// -// private def genPartitionHdfsPath(partition: List[(String, Any)]): String = { -// partition.map(prtn => s"${prtn._1}=${prtn._2}").mkString("/") -// } -// private def addPartitionSql(tbn: String, partition: List[(String, Any)]): String = { -// val partitionSql = partition.map(ptn => (s"`${ptn._1}` = ${ptn._2}")).mkString(", ") -// val sql = s"""ALTER TABLE ${tbn} ADD IF NOT EXISTS PARTITION (${partitionSql})""" -// sql -// } -// private def selectSql(tbn: String, partitionRange: List[(String, (Any, Any))]): String = { -// val clause = partitionRange.map { pr => -// val (name, (r1, r2)) = pr -// s"""`${name}` BETWEEN '${r1}' and '${r2}'""" -// }.mkString(" AND ") -// val whereClause = if (clause.nonEmpty) s"WHERE ${clause}" else "" -// val sql = s"""SELECT * FROM ${tbn} ${whereClause}""" -// sql -// } -// private def dropPartitionSql(tbn: String, partition: List[(String, Any)]): String = { -// val partitionSql = partition.map(ptn => (s"PARTITION ( `${ptn._1}` < '${ptn._2}' ) ")).mkString(", ") -// val sql = s"""ALTER TABLE ${tbn} DROP ${partitionSql}""" -// println(sql) -// sql -// } -// -// private def tableExistsSql(): String = { -// s"tableName LIKE '${tableName}'" -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala deleted file mode 100644 index 0daf2d9..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala +++ /dev/null @@ -1,311 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.data.connector.cache -// -//import java.util.concurrent.TimeUnit -// -//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} -//import org.apache.griffin.measure.config.params.user.DataCacheParam -//import org.apache.griffin.measure.result.TimeStampInfo -//import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil} -//import org.apache.spark.rdd.RDD -//import org.apache.spark.sql.SQLContext -// -//import scala.util.Try -// -//case class TextCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam -// ) extends CacheDataConnector { -// -// val config = dataCacheParam.config -// val InfoPath = "info.path" -// val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString -// -// val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") -// val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") -// -// val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil -// val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match { -// case s :: e :: _ => { -// val ns = TimeUtil.milliseconds(s) match { -// case Some(n) if (n < 0) => n -// case _ => 0 -// } -// val ne = TimeUtil.milliseconds(e) match { -// case Some(n) if (n < 0) => n -// case _ => 0 -// } -// (ns, ne) -// } -// case _ => (0, 0) -// } -// -// val FilePath = "file.path" -// val filePath: String = config.get(FilePath) match { -// case Some(s: String) => s -// case _ => throw new Exception("invalid file.path!") -// } -// -// val ReadyTimeInterval = "ready.time.interval" -// val ReadyTimeDelay = "ready.time.delay" -// val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L) -// val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L) -// -//// val TimeStampColumn: String = TimeStampInfo.key -//// val PayloadColumn: String = "payload" -// -// // cache schema: Long, String -//// val fields = List[StructField]( -//// StructField(TimeStampColumn, LongType), -//// StructField(PayloadColumn, StringType) -//// ) -//// val schema = StructType(fields) -// -// // case class CacheData(time: Long, payload: String) { -// // def getTime(): Long = time -// // def getPayload(): String = payload -// // } -// -// private val rowSepLiteral = "\n" -// -// val partitionUnits: List[String] = List("hour", "min") -// -// override def init(): Unit = { -// // do nothing -// } -// -// def available(): Boolean = { -// true -// } -// -// private def encode(data: Map[String, Any], ms: Long): Option[String] = { -// try { -// val map = data + (TimeStampInfo.key -> ms) -// Some(JsonUtil.toJson(map)) -// } catch { -// case _: Throwable => None -// } -// } -// -// private def decode(data: String): Option[Map[String, Any]] = { -// try { -// Some(JsonUtil.toAnyMap(data)) -// } catch { -// case _: Throwable => None -// } -// } -// -// def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = { -// val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) -// if (newCacheLocked) { -// try { -// val ptns = getPartition(ms) -// val ptnsPath = genPartitionHdfsPath(ptns) -// val dirPath = s"${filePath}/${ptnsPath}" -// val dataFileName = s"${ms}" -// val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) -// -// // encode data -// val dataRdd: RDD[String] = rdd.flatMap(encode(_, ms)) -// -// // save data -// val dumped = if (!dataRdd.isEmpty) { -// HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral) -// } else false -// -// // submit ms -// submitCacheTime(ms) -// submitReadyTime(ms) -// } catch { -// case e: Throwable => error(s"save data error: ${e.getMessage}") -// } finally { -// newCacheLock.unlock() -// } -// } -// } -// -// def readData(): Try[RDD[Map[String, Any]]] = Try { -// val timeRange = TimeInfoCache.getTimeRange -// submitLastProcTime(timeRange._2) -// -// val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) -// submitCleanTime(reviseTimeRange._1) -// -// // read directly through partition info -// val partitionRanges = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2) -// println(s"read time ranges: ${reviseTimeRange}") -// println(s"read partition ranges: ${partitionRanges}") -// -// // list partition paths -// val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges) -// -// if (partitionPaths.isEmpty) { -// sqlContext.sparkContext.emptyRDD[Map[String, Any]] -// } else { -// val filePaths = partitionPaths.mkString(",") -// val rdd = sqlContext.sparkContext.textFile(filePaths) -// -// // decode data -// rdd.flatMap { row => -// decode(row) -// } -// } -// } -// -// override def cleanOldData(): Unit = { -// val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) -// if (oldCacheLocked) { -// try { -// val cleanTime = readCleanTime() -// cleanTime match { -// case Some(ct) => { -// // drop partitions -// val bounds = getPartition(ct) -// -// // list partition paths -// val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds) -// -// // delete out time data path -// earlierPaths.foreach { path => -// println(s"delete hdfs path: ${path}") -// HdfsUtil.deleteHdfsPath(path) -// } -// } -// case _ => { -// // do nothing -// } -// } -// } catch { -// case e: Throwable => error(s"clean old data error: ${e.getMessage}") -// } finally { -// oldCacheLock.unlock() -// } -// } -// } -// -// override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = { -// // parallel process different time groups, lock is unnecessary -// val ptns = getPartition(t) -// val ptnsPath = genPartitionHdfsPath(ptns) -// val dirPath = s"${filePath}/${ptnsPath}" -// val dataFileName = s"${t}" -// val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) -// -// try { -// // remove out time old data -// HdfsFileDumpUtil.remove(dirPath, dataFileName, true) -// -// // save updated old data -// if (oldData.size > 0) { -// val recordDatas = oldData.flatMap { dt => -// encode(dt, t) -// } -// val dumped = HdfsFileDumpUtil.dump(dataFilePath, recordDatas, rowSepLiteral) -// } -// } catch { -// case e: Throwable => error(s"update old data error: ${e.getMessage}") -// } -// } -// -// override protected def genCleanTime(ms: Long): Long = { -// val minPartitionUnit = partitionUnits.last -// val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit) -// val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit) -// t2 -// } -// -// private def getPartition(ms: Long): List[Long] = { -// partitionUnits.map { unit => -// TimeUtil.timeToUnit(ms, unit) -// } -// } -// private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = { -// partitionUnits.map { unit => -// val t1 = TimeUtil.timeToUnit(ms1, unit) -// val t2 = TimeUtil.timeToUnit(ms2, unit) -// (t1, t2) -// } -// } -// -// private def genPartitionHdfsPath(partition: List[Long]): String = { -// partition.map(prtn => s"${prtn}").mkString("/") -// } -// -// private def str2Long(str: String): Option[Long] = { -// try { -// Some(str.toLong) -// } catch { -// case e: Throwable => None -// } -// } -// -// // here the range means [min, max], but the best range should be (min, max] -// private def listPathsBetweenRanges(paths: List[String], -// partitionRanges: List[(Long, Long)] -// ): List[String] = { -// partitionRanges match { -// case Nil => paths -// case head :: tail => { -// val (lb, ub) = head -// val curPaths = paths.flatMap { path => -// val names = HdfsUtil.listSubPaths(path, "dir").toList -// names.filter { name => -// str2Long(name) match { -// case Some(t) => (t >= lb) && (t <= ub) -// case _ => false -// } -// }.map(HdfsUtil.getHdfsFilePath(path, _)) -// } -// listPathsBetweenRanges(curPaths, tail) -// } -// } -// } -// -// private def listPathsEarlierThanBounds(paths: List[String], bounds: List[Long] -// ): List[String] = { -// bounds match { -// case Nil => paths -// case head :: tail => { -// val earlierPaths = paths.flatMap { path => -// val names = HdfsUtil.listSubPaths(path, "dir").toList -// names.filter { name => -// str2Long(name) match { -// case Some(t) => (t < head) -// case _ => false -// } -// }.map(HdfsUtil.getHdfsFilePath(path, _)) -// } -// val equalPaths = paths.flatMap { path => -// val names = HdfsUtil.listSubPaths(path, "dir").toList -// names.filter { name => -// str2Long(name) match { -// case Some(t) => (t == head) -// case _ => false -// } -// }.map(HdfsUtil.getHdfsFilePath(path, _)) -// } -// -// tail match { -// case Nil => earlierPaths -// case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, tail) -// } -// } -// } -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala deleted file mode 100644 index 84316b3..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala +++ /dev/null @@ -1,87 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.persist -// -//import org.apache.griffin.measure.result._ -//import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} -//import org.apache.spark.rdd.RDD -// -//// persist result by old http way -- temporary way -//case class OldHttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { -// -// val Api = "api" -// val Method = "method" -// -// val api = config.getOrElse(Api, "").toString -// val method = config.getOrElse(Method, "post").toString -// -// def available(): Boolean = { -// api.nonEmpty -// } -// -// def start(msg: String): Unit = {} -// def finish(): Unit = {} -// -// def result(rt: Long, result: Result): Unit = { -// result match { -// case ar: AccuracyResult => { -// val matchPercentage: Double = if (ar.getTotal <= 0) 0 else (ar.getMatch * 1.0 / ar.getTotal) * 100 -// val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> matchPercentage), ("count" -> ar.getTotal)) -// httpResult(dataMap) -// } -// case pr: ProfileResult => { -// val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> pr.getMatch), ("count" -> pr.getTotal)) -// httpResult(dataMap) -// } -// case _ => { -// info(s"result: ${result}") -// } -// } -// } -// -// private def httpResult(dataMap: Map[String, Any]) = { -// try { -// val data = JsonUtil.toJson(dataMap) -// // post -// val params = Map[String, Object]() -// val header = Map[String, Object](("content-type" -> "application/json")) -// -// def func(): Boolean = { -// HttpUtil.httpRequest(api, method, params, header, data) -// } -// -// PersistThreadPool.addTask(func _, 10) -// -//// val status = HttpUtil.httpRequest(api, method, params, header, data) -//// info(s"${method} to ${api} response status: ${status}") -// } catch { -// case e: Throwable => error(e.getMessage) -// } -// -// } -// -// def records(recs: RDD[String], tp: String): Unit = {} -// def records(recs: Iterable[String], tp: String): Unit = {} -// -//// def missRecords(records: RDD[String]): Unit = {} -//// def matchRecords(records: RDD[String]): Unit = {} -// -// def log(rt: Long, msg: String): Unit = {} -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala b/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala deleted file mode 100644 index 7f1b153..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala +++ /dev/null @@ -1,34 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.algo -// -//import org.apache.griffin.measure.config.params.env._ -//import org.apache.griffin.measure.config.params.user._ -//import org.apache.griffin.measure.log.Loggable -// -//import scala.util.Try -// -//trait Algo extends Loggable with Serializable { -// -// val envParam: EnvParam -// val userParam: UserParam -// -// def run(): Try[_] -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/resources/test-data.jsonFile ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/test-data.jsonFile b/measure/src/test/resources/test-data.jsonFile deleted file mode 100644 index 73707f4..0000000 --- a/measure/src/test/resources/test-data.jsonFile +++ /dev/null @@ -1,3 +0,0 @@ -{ "name": "emily", "age": 5, "map": { "a": 1, "b": 2 }, "list": [ { "c": 1, "d": 2 }, { "c": 3, "d": 4 } ], "t": [1, 2, 3] } -{ "name": "white", "age": 15, "map": { "a": 11, "b": 12 }, "list": [ { "c": 11, "d": 2 }, { "c": 23, "d": 4 } ], "t": [1, 2, 3] } -{ "name": "west", "age": 25, "map": { "a": 21, "b": 22 }, "list": [ { "c": 11, "d": 2 }, { "c": 23, "d": 4 } ], "t": [1, 2, 3] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/resources/test-data1.jsonFile ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/test-data1.jsonFile b/measure/src/test/resources/test-data1.jsonFile deleted file mode 100644 index 1e1f28a..0000000 --- a/measure/src/test/resources/test-data1.jsonFile +++ /dev/null @@ -1,31 +0,0 @@ -[{ - "Year": "2013", - "First Name": "DAVID", - "County": "KINGS", - "Sex": "M", - "Count": "272" -}, { - "Year": "2013", - "First Name": "JAYDEN", - "County": "KINGS", - "Sex": "M", - "Count": "268" -}, { - "Year": "2013", - "First Name": "JAYDEN", - "County": "QUEENS", - "Sex": "M", - "Count": "219" -}, { - "Year": "2013", - "First Name": "MOSHE", - "County": "KINGS", - "Sex": "M", - "Count": "219" -}, { - "Year": "2013", - "First Name": "ETHAN", - "County": "QUEENS", - "Sex": "M", - "Count": "216" -}] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala b/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala deleted file mode 100644 index fc42d43..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala +++ /dev/null @@ -1,78 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.cache -// -//import java.util.Date -//import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} -// -//import org.apache.curator.framework.recipes.locks.InterProcessMutex -//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} -//import org.apache.curator.retry.ExponentialBackoffRetry -//import org.apache.griffin.measure.cache.info.InfoCacheInstance -//import org.apache.griffin.measure.config.params.env.InfoCacheParam -//import org.junit.runner.RunWith -//import org.scalatest.junit.JUnitRunner -//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -// -//import scala.util.{Failure, Try} -// -//@RunWith(classOf[JUnitRunner]) -//class InfoCacheInstanceTest extends FunSuite with Matchers with BeforeAndAfter { -// -// val map = Map[String, Any]( -// ("hosts" -> "localhost:2181"), -// ("namespace" -> "griffin/infocache"), -// ("lock.path" -> "lock"), -// ("mode" -> "persist"), -// ("init.clear" -> true), -// ("close.clear" -> false) -// ) -// val name = "ttt" -// -// val icp = InfoCacheParam("zk", map) -// val icps = icp :: Nil -// -// before { -// InfoCacheInstance.initInstance(icps, name) -// InfoCacheInstance.init -// } -// -// test ("others") { -// InfoCacheInstance.available should be (true) -// -// val keys = List[String]( -// "key1", "key2" -// ) -// val info = Map[String, String]( -// ("key1" -> "value1"), -// ("key2" -> "value2") -// ) -// -// InfoCacheInstance.cacheInfo(info) should be (true) -// InfoCacheInstance.readInfo(keys) should be (info) -// InfoCacheInstance.deleteInfo(keys) -//// InfoCacheInstance.readInfo(keys) should be (Map[String, String]()) -// -// } -// -// after { -// InfoCacheInstance.close() -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala b/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala deleted file mode 100644 index 271529c..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala +++ /dev/null @@ -1,84 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.cache -// -//import java.util.Date -//import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} -// -//import org.apache.curator.framework.recipes.locks.InterProcessMutex -//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} -//import org.apache.curator.retry.ExponentialBackoffRetry -//import org.apache.griffin.measure.cache.info.ZKInfoCache -//import org.junit.runner.RunWith -//import org.scalatest.junit.JUnitRunner -//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -// -//import scala.util.{Failure, Try} -// -//@RunWith(classOf[JUnitRunner]) -//class ZKCacheLockTest extends FunSuite with Matchers with BeforeAndAfter { -// -// val map = Map[String, Any]( -// ("hosts" -> "localhost:2181"), -// ("namespace" -> "griffin/infocache"), -// ("lock.path" -> "lock"), -// ("mode" -> "persist"), -// ("init.clear" -> true), -// ("close.clear" -> false) -// ) -// val name = "ttt" -// -// val ic = ZKInfoCache(map, name) -// -// before { -// ic.init -// } -// -// test ("lock") { -// -// case class Proc(n: Int) extends Runnable { -// override def run(): Unit = { -// val cl = ic.genLock("proc") -// val b = cl.lock(2, TimeUnit.SECONDS) -// try { -// println(s"${n}: ${b}") -// if (b) Thread.sleep(3000) -// } finally { -// cl.unlock() -// } -// } -// } -// -// val pool = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor] -// val t = 0 until 10 -// t.foreach(a => pool.submit(Proc(a))) -// -// pool.shutdown() -// val t1 = new Date() -// println(s"${t1}: pool shut down") -// pool.awaitTermination(20, TimeUnit.SECONDS) -// val t2 = new Date() -// println(s"${t2}: pool shut down done [${t2.getTime - t1.getTime}]") -// } -// -// after { -// ic.close() -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala b/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala deleted file mode 100644 index 086170a..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala +++ /dev/null @@ -1,90 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.cache -// -//import java.util.Date -//import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} -// -//import org.apache.curator.framework.recipes.locks.InterProcessMutex -//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} -//import org.apache.curator.retry.ExponentialBackoffRetry -//import org.apache.griffin.measure.cache.info.ZKInfoCache -//import org.junit.runner.RunWith -//import org.scalatest.junit.JUnitRunner -//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -// -//import scala.util.{Failure, Try} -// -//@RunWith(classOf[JUnitRunner]) -//class ZKInfoCacheTest extends FunSuite with Matchers with BeforeAndAfter { -// -// val map = Map[String, Any]( -// ("hosts" -> "localhost:2181"), -// ("namespace" -> "griffin/infocache"), -// ("lock.path" -> "lock"), -// ("mode" -> "persist"), -// ("init.clear" -> true), -// ("close.clear" -> false) -// ) -// val name = "ttt" -// -// test ("available") { -// val ic = ZKInfoCache(map, name) -// ic.init -// -// ic.available should be (true) -// -// ic.close -// } -// -// test ("cacheInfo and readInfo") { -// val ic = ZKInfoCache(map, name) -// ic.init -// -// val keys = List[String]( -// "key1", "key2" -// ) -// val info = Map[String, String]( -// ("key1" -> "value1"), -// ("key2" -> "value2") -// ) -// -// ic.cacheInfo(info) should be (true) -// ic.readInfo(keys) should be (info) -// ic.deleteInfo(keys) -// ic.readInfo(keys) should be (Map[String, String]()) -// -// ic.close -// } -// -// test ("genLock") { -// val ic = ZKInfoCache(map, name) -// ic.init -// -// val lock1 = ic.genLock("ttt") -// val lock2 = ic.genLock("ttt") -// lock1.lock(5, TimeUnit.SECONDS) -// lock2.lock(5, TimeUnit.SECONDS) -// lock1.unlock -// lock2.unlock -// -// ic.close -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala deleted file mode 100644 index 845a051..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala +++ /dev/null @@ -1,146 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.process -// -//import org.apache.griffin.measure.config.params.env._ -//import org.apache.griffin.measure.config.params.user._ -//import org.apache.griffin.measure.config.params._ -//import org.apache.griffin.measure.config.reader.ParamReaderFactory -//import org.apache.griffin.measure.config.validator.AllParamValidator -//import org.apache.griffin.measure.log.Loggable -//import org.apache.griffin.measure.persist.PersistThreadPool -//import org.junit.runner.RunWith -//import org.scalatest.junit.JUnitRunner -//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -// -//import scala.util.{Failure, Success, Try} -// -//@RunWith(classOf[JUnitRunner]) -//class BatchProcessTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { -// -// val envFile = "src/test/resources/env-test.json" -// val confFile = "src/test/resources/config-test-profiling.json" -//// val confFile = "src/test/resources/config-test-accuracy.json" -// -// val envFsType = "local" -// val userFsType = "local" -// -// val args = Array(envFile, confFile) -// -// var allParam: AllParam = _ -// -// before { -// // read param files -// val envParam = readParamFile[EnvParam](envFile, envFsType) match { -// case Success(p) => p -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-2) -// } -// } -// val userParam = readParamFile[UserParam](confFile, userFsType) match { -// case Success(p) => p -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-2) -// } -// } -// allParam = AllParam(envParam, userParam) -// -// // validate param files -// validateParams(allParam) match { -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-3) -// } -// case _ => { -// info("params validation pass") -// } -// } -// } -// -// test ("batch process") { -// val procType = ProcessType(allParam.userParam.procType) -// val proc: DqProcess = procType match { -// case BatchProcessType => BatchDqProcess(allParam) -// case StreamingProcessType => StreamingDqProcess(allParam) -// case _ => { -// error(s"${procType} is unsupported process type!") -// sys.exit(-4) -// } -// } -// -// // process init -// proc.init match { -// case Success(_) => { -// info("process init success") -// } -// case Failure(ex) => { -// error(s"process init error: ${ex.getMessage}") -// shutdown -// sys.exit(-5) -// } -// } -// -// // process run -// proc.run match { -// case Success(_) => { -// info("process run success") -// } -// case Failure(ex) => { -// error(s"process run error: ${ex.getMessage}") -// -// if (proc.retriable) { -// throw ex -// } else { -// shutdown -// sys.exit(-5) -// } -// } -// } -// -// // process end -// proc.end match { -// case Success(_) => { -// info("process end success") -// } -// case Failure(ex) => { -// error(s"process end error: ${ex.getMessage}") -// shutdown -// sys.exit(-5) -// } -// } -// -// shutdown -// } -// -// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { -// val paramReader = ParamReaderFactory.getParamReader(file, fsType) -// paramReader.readConfig[T] -// } -// -// private def validateParams(allParam: AllParam): Try[Boolean] = { -// val allParamValidator = AllParamValidator() -// allParamValidator.validate(allParam) -// } -// -// private def shutdown(): Unit = { -// PersistThreadPool.shutdown -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala deleted file mode 100644 index 1273bcf..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala +++ /dev/null @@ -1,531 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.process -// -//import org.apache.griffin.measure.config.params._ -//import org.apache.griffin.measure.config.params.env._ -//import org.apache.griffin.measure.config.params.user._ -//import org.apache.griffin.measure.config.reader.ParamReaderFactory -//import org.apache.griffin.measure.config.validator.AllParamValidator -//import org.apache.griffin.measure.log.Loggable -//import org.apache.griffin.measure.persist.PersistThreadPool -//import org.apache.griffin.measure.process.engine.DataFrameOprs -//import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil} -//import org.apache.hadoop.hive.ql.exec.UDF -//import org.apache.spark.{SparkConf, SparkContext} -//import org.apache.spark.sql._ -//import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -//import org.apache.spark.sql.expressions.UserDefinedAggregateFunction -//import org.apache.spark.sql.hive.HiveContext -//import org.apache.spark.sql.types._ -//import org.junit.runner.RunWith -//import org.scalatest.junit.JUnitRunner -//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -// -//import scala.collection.mutable.WrappedArray -//import scala.util.{Failure, Success, Try} -// -//@RunWith(classOf[JUnitRunner]) -//class JsonParseTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { -// -// var sparkContext: SparkContext = _ -// var sqlContext: SQLContext = _ -// -// before { -// val conf = new SparkConf().setAppName("test json").setMaster("local[*]") -// sparkContext = new SparkContext(conf) -// sparkContext.setLogLevel("WARN") -//// sqlContext = new HiveContext(sparkContext) -// sqlContext = new SQLContext(sparkContext) -// } -// -// test ("json test") { -// // 0. prepare data -//// val dt = -//// """ -//// |{"name": "s1", "age": 12, "items": [1, 2, 3], -//// |"subs": [{"id": 1, "type": "seed"}, {"id": 2, "type": "frog"}], -//// |"inner": {"a": 1, "b": 2}, "jstr": "{\"s1\": \"aaa\", \"s2\": 123}" -//// |}""".stripMargin -//// val rdd0 = sparkContext.parallelize(Seq(dt)).map(Row(_)) -// val rdd0 = sparkContext.textFile("src/test/resources/input.msg").map(Row(_)) -// -// val vtp = StructField("value", StringType) -// val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp))) -// df0.registerTempTable("src") -// -//// val fromJson2Array = (s: String) => { -//// JsonUtil.fromJson[Seq[String]](s) -//// } -//// sqlContext.udf.register("from_json_to_array", fromJson2Array) -//// -//// val df2 = sqlContext.sql("SELECT explode(from_json_to_array(get_json_object(value, '$.seeds'))) as value FROM src") -//// df2.printSchema -//// df2.show(10) -//// df2.registerTempTable("df2") -// -// -// -// // 1. read from json string to extracted json row -//// val readSql = "SELECT value FROM src" -//// val df = sqlContext.sql(readSql) -//// val df = sqlContext.table("src") -//// val rdd = df.map { row => -//// row.getAs[String]("value") -//// } -//// val df1 = sqlContext.read.json(rdd) -//// df1.printSchema -//// df1.show(10) -//// df1.registerTempTable("df1") -// val details = Map[String, Any](("df.name" -> "src")) -// val df1 = DataFrameOprs.fromJson(sqlContext, details) -// df1.registerTempTable("df1") -// -// // 2. extract json array into lines -//// val rdd2 = df1.flatMap { row => -//// row.getAs[WrappedArray[String]]("seeds") -//// } -//// val df2 = sqlContext.read.json(rdd2) -// val df2 = sqlContext.sql("select explode(seeds) as value from df1") -//// val tdf = sqlContext.sql("select name, age, explode(items) as item from df1") -//// tdf.registerTempTable("tdf") -//// val df2 = sqlContext.sql("select struct(name, age, item) as ttt from tdf") -// df2.printSchema -// df2.show(10) -// df2.registerTempTable("df2") -// println(df2.count) -// -// val sql1 = "SELECT value FROM df2" -// val df22 = sqlContext.sql(sql1) -// val rdd22 = df22.map { row => -// row.getAs[String]("value") -// } -// import org.apache.spark.sql.functions._ -// val df23 = sqlContext.read.json(rdd22) -// df23.registerTempTable("df23") -//// df23.withColumn("par", monotonicallyIncreasingId) -// -// val df24 = sqlContext.sql("SELECT url, cast(get_json_object(metadata, '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM df23") -// df24.printSchema -// df24.show(10) -// df24.registerTempTable("df24") -// println(df24.count) -// -//// val df25 = sqlContext.sql("select ") -// -//// -//// // 3. extract json string into row -////// val df3 = sqlContext.sql("select cast(get_json_object(metadata, '$.tracker.crawlRequestCreateTS') as bigint), url from df2") -//// val df3 = sqlContext.sql("select cast(get_json_object(get_json_object(value, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint), get_json_object(value, '$.url') from df2") -//// df3.printSchema() -//// df3.show(10) -//// println(df3.count) -// -// -// -//// val df5 = sqlContext.sql("select get_json_object(value, '$.subs') as subs from src") -//// df5.printSchema() -//// df5.show(10) -//// df5.registerTempTable("df5") -//// val rdd5 = df5.map { row => -//// row.getAs[String]("subs") -//// } -//// val df6 = sqlContext.read.json(rdd5) -//// df6.printSchema -//// df6.show(10) -// -// // 2. extract json string to row -//// val df2 = sqlContext.sql("select jstr from df1") -//// val rdd2 = df2.map { row => -//// row.getAs[String]("jstr") -//// } -//// val df22 = sqlContext.read.json(rdd2) -//// df22.printSchema -//// df22.show(100) -//// df22.registerTempTable("df2") -//// -//// val df23 = sqlContext.sql("select json_tuple(jstr, 's1', 's2') from df1") -//// df23.printSchema() -//// df23.show(100) -// -// // 3. extract json array into lines ?? -// -// // 3. flatmap from json row to json row -//// val df3 = sqlContext.sql("select explode(subs) as sub, items from df1") -//// df3.printSchema() -//// df3.show(10) -//// df3.registerTempTable("df3") -//// -//// val df4 = sqlContext.sql("select explode(items) as item, sub from df3") -//// df4.printSchema() -//// df4.show(10) -// -//// sqlContext.udf.register("length", (s: WrappedArray[_]) => s.length) -// // -// // val df2 = sqlContext.sql("SELECT inner from df1") -// // df2.registerTempTable("df2") -// // df2.printSchema -// // df2.show(100) -// -//// def children(colname: String, df: DataFrame): Array[DataFrame] = { -//// val parent = df.schema.fields.filter(_.name == colname).head -//// println(parent) -//// val fields: Array[StructField] = parent.dataType match { -//// case x: StructType => x.fields -//// case _ => Array.empty[StructField] -//// } -//// fields.map(x => col(s"$colname.${x.name}")) -////// fields.foreach(println) -//// } -////// -//// children("inner", df2) -//// -//// df2.select(children("bar", df): _*).printSchema -// -//// val df3 = sqlContext.sql("select inline(subs) from df1") -//// df3.printSchema() -//// df3.show(100) -// -//// val rdd2 = df2.flatMap { row => -//// row.getAs[GenericRowWithSchema]("inner") :: Nil -//// } -//// -//// rdd2. -// -//// val funcs = sqlContext.sql("show functions") -//// funcs.printSchema() -//// funcs.show(1000) -//// -//// val desc = sqlContext.sql("describe function inline") -//// desc.printSchema() -//// desc.show(100) -// -// // -// -// } -// -// test ("json test 2") { -// val rdd0 = sparkContext.textFile("src/test/resources/output.msg").map(Row(_)) -// -// val vtp = StructField("value", StringType) -// val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp))) -// df0.registerTempTable("tgt") -// -//// val fromJson2StringArray = (s: String) => { -//// val seq = JsonUtil.fromJson[Seq[Any]](s) -//// seq.map(i => JsonUtil.toJson(i)) -//// } -//// sqlContext.udf.register("from_json_to_string_array", fromJson2StringArray) -//// -//// val df2 = sqlContext.sql("SELECT from_json_to_string_array(get_json_object(value, '$.groups[0].attrsList')) as value FROM tgt") -//// df2.printSchema() -//// df2.show(10) -//// df2.registerTempTable("df2") -//// -//// val indexOfStringArray = (sa: String, ) -// -// -// // 1. read from json string to extracted json row -// val readSql = "SELECT value FROM tgt" -// val df = sqlContext.sql(readSql) -// val rdd = df.map { row => -// row.getAs[String]("value") -// } -// val df1 = sqlContext.read.json(rdd) -// df1.printSchema -// df1.show(10) -// df1.registerTempTable("df1") -// -// -// val df2 = sqlContext.sql("select groups[0].attrsList as attrs from df1") -// df2.printSchema -// df2.show(10) -// df2.registerTempTable("df2") -// println(df2.count) -// -// val indexOf = (arr: Seq[String], v: String) => { -// arr.indexOf(v) -// } -// sqlContext.udf.register("index_of", indexOf) -// -// val df3 = sqlContext.sql("select attrs.values[index_of(attrs.name, 'URL')][0] as url, cast(get_json_object(attrs.values[index_of(attrs.name, 'CRAWLMETADATA')][0], '$.tracker.crawlRequestCreateTS') as bigint) as ts from df2") -// df3.printSchema() -// df3.show(10) -// df3.registerTempTable("df3") -// } -// -// test ("testing") { -// val dt = -// """ -// |{"name": "age", "age": 12, "items": [1, 2, 3], -// |"subs": [{"id": 1, "type": "seed"}, {"id": 2, "type": "frog"}], -// |"inner": {"a": 1, "b": 2}, "jstr": "{\"s1\": \"aaa\", \"s2\": 123}", "b": true -// |}""".stripMargin -// val rdd = sparkContext.parallelize(Seq(dt)).map(Row(_)) -// val vtp = StructField("value", StringType) -// val df = sqlContext.createDataFrame(rdd, StructType(Array(vtp))) -// df.registerTempTable("df") -// -// val df1 = sqlContext.read.json(sqlContext.sql("select * from df").map(r => r.getAs[String]("value"))) -// df1.printSchema() -// df1.show(10) -// df1.registerTempTable("df1") -// -// val test = (s: String) => { -// s.toInt -// } -// sqlContext.udf.register("to_int", test) -// -// val df2 = sqlContext.sql("select (b) as aa, inner.a from df1 where age = to_int(\"12\")") -// df2.printSchema() -// df2.show(10) -// } -// -// test ("test input only sql") { -// val rdd0 = sparkContext.textFile("src/test/resources/input.msg").map(Row(_)) -// -// val vtp = StructField("value", StringType) -// val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp))) -// df0.registerTempTable("src") -// df0.show(10) -// -// // 1. read from json string to extracted json row -// val df1 = sqlContext.sql("SELECT get_json_object(value, '$.seeds') as seeds FROM src") -// df1.printSchema -// df1.show(10) -// df1.registerTempTable("df1") -// -// val json2StringArray: (String) => Seq[String] = (s: String) => { -// val seq = JsonUtil.fromJson[Seq[String]](s) -//// seq.map(i => JsonUtil.toJson(i)) -// seq -// } -// sqlContext.udf.register("json_to_string_array", json2StringArray) -// -// val df2 = sqlContext.sql("SELECT explode(json_to_string_array(seeds)) as seed FROM df1") -// df2.printSchema -// df2.show(10) -// df2.registerTempTable("df2") -// -// -// val df3 = sqlContext.sql("SELECT get_json_object(seed, '$.url') as url, cast(get_json_object(get_json_object(seed, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM df2") -// df3.printSchema -// df3.show(10) -// } -// -// test ("test output only sql") { -// val rdd0 = sparkContext.textFile("src/test/resources/output.msg").map(Row(_)) -// -// val vtp = StructField("value", StringType) -// val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp))) -// df0.registerTempTable("tgt") -// df0.printSchema() -// df0.show(10) -// -// val json2StringArray: (String) => Seq[String] = (s: String) => { -// JsonUtil.fromJson[Seq[String]](s) -// } -// sqlContext.udf.register("json_to_string_array", json2StringArray) -// -// val json2StringJsonArray: (String) => Seq[String] = (s: String) => { -// val seq = JsonUtil.fromJson[Seq[Any]](s) -// seq.map(i => JsonUtil.toJson(i)) -// } -// sqlContext.udf.register("json_to_string_json_array", json2StringJsonArray) -// -// val indexOf = (arr: Seq[String], v: String) => { -// arr.indexOf(v) -// } -// sqlContext.udf.register("index_of", indexOf) -// -// val indexOfField = (arr: Seq[String], k: String, v: String) => { -// val seq = arr.flatMap { item => -// JsonUtil.fromJson[Map[String, Any]](item).get(k) -// } -// seq.indexOf(v) -// } -// sqlContext.udf.register("index_of_field", indexOfField) -// -// // 1. read from json string to extracted json row -// val df1 = sqlContext.sql("SELECT get_json_object(value, '$.groups[0].attrsList') as attrs FROM tgt") -// df1.printSchema -// df1.show(10) -// df1.registerTempTable("df1") -// -// val df2 = sqlContext.sql("SELECT json_to_string_json_array(attrs) as attrs FROM df1") -// df2.printSchema() -// df2.show(10) -// df2.registerTempTable("df2") -// -// val df3 = sqlContext.sql("SELECT attrs[index_of_field(attrs, 'name', 'URL')] as attr1, attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')] as attr2 FROM df2") -// df3.printSchema() -// df3.show(10) -// df3.registerTempTable("df3") -// -// val df4 = sqlContext.sql("SELECT json_to_string_array(get_json_object(attr1, '$.values'))[0], cast(get_json_object(json_to_string_array(get_json_object(attr2, '$.values'))[0], '$.tracker.crawlRequestCreateTS') as bigint) FROM df3") -// df4.printSchema() -// df4.show(10) -// } -// -// test ("test from json") { -// val fromJson2Map = (str: String) => { -// val a = JsonUtil.fromJson[Map[String, Any]](str) -// a.mapValues { v => -// v match { -// case t: String => t -// case _ => JsonUtil.toJson(v) -// } -// } -// } -// sqlContext.udf.register("from_json_to_map", fromJson2Map) -// -// val fromJson2Array = (str: String) => { -// val a = JsonUtil.fromJson[Seq[Any]](str) -// a.map { v => -// v match { -// case t: String => t -// case _ => JsonUtil.toJson(v) -// } -// } -// } -// sqlContext.udf.register("from_json_to_array", fromJson2Array) -// -// // ======================== -// -// val srdd = sparkContext.textFile("src/test/resources/input.msg").map(Row(_)) -// val svtp = StructField("value", StringType) -// val sdf0 = sqlContext.createDataFrame(srdd, StructType(Array(svtp))) -// sdf0.registerTempTable("sdf0") -// sdf0.show(10) -// -// // 1. read from json string to extracted json row -// val sdf1 = sqlContext.sql("SELECT explode(from_json_to_array(get_json_object(value, '$.seeds'))) as seed FROM sdf0") -// sdf1.printSchema -// sdf1.show(10) -// sdf1.registerTempTable("sdf1") -// -// val sdf2 = sqlContext.sql("SELECT get_json_object(seed, '$.url') as url, cast(get_json_object(get_json_object(seed, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM sdf1") -// sdf2.printSchema -// sdf2.show(10) -// -// // --------------------------------------- -// -// val trdd = sparkContext.textFile("src/test/resources/output.msg").map(Row(_)) -// val tvtp = StructField("value", StringType) -// val tdf0 = sqlContext.createDataFrame(trdd, StructType(Array(tvtp))) -// tdf0.registerTempTable("tdf0") -// tdf0.printSchema() -// tdf0.show(10) -// -//// val json2StringArray: (String) => Seq[String] = (s: String) => { -//// JsonUtil.fromJson[Seq[String]](s) -//// } -//// sqlContext.udf.register("json_to_string_array", json2StringArray) -//// -//// val json2StringJsonArray: (String) => Seq[String] = (s: String) => { -//// val seq = JsonUtil.fromJson[Seq[Any]](s) -//// seq.map(i => JsonUtil.toJson(i)) -//// } -//// sqlContext.udf.register("json_to_string_json_array", json2StringJsonArray) -//// -//// val indexOf = (arr: Seq[String], v: String) => { -//// arr.indexOf(v) -//// } -//// sqlContext.udf.register("index_of", indexOf) -//// -// val indexOfField = (arr: Seq[String], k: String, v: String) => { -// val seq = arr.flatMap { item => -// JsonUtil.fromJson[Map[String, Any]](item).get(k) -// } -// seq.indexOf(v) -// } -// sqlContext.udf.register("index_of_field", indexOfField) -// -// // 1. read from json string to extracted json row -//// val df1 = sqlContext.sql("SELECT get_json_object(value, '$.groups[0].attrsList') as attrs FROM tdf0") -// val tdf1 = sqlContext.sql("SELECT from_json_to_array(get_json_object(value, '$.groups[0].attrsList')) as attrs FROM tdf0") -// tdf1.printSchema -// tdf1.show(10) -// tdf1.registerTempTable("tdf1") -// -//// val tdf2 = sqlContext.sql("SELECT attrs[index_of_field(attrs, 'name', 'URL')] as attr1, attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')] as attr2 FROM tdf1") -//// tdf2.printSchema() -//// tdf2.show(10) -//// tdf2.registerTempTable("tdf2") -// -// val tdf3 = sqlContext.sql("SELECT from_json_to_array(get_json_object(attrs[index_of_field(attrs, 'name', 'URL')], '$.values'))[0] as url, cast(get_json_object(from_json_to_array(get_json_object(attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')], '$.values'))[0], '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM tdf1") -// tdf3.printSchema() -// tdf3.show(10) -// } -// -// test ("sql functions") { -// val functions = sqlContext.sql("show functions") -// functions.printSchema() -// functions.show(10) -// -// val functionNames = functions.map(_.getString(0)).collect -// functionNames.foreach(println) -// } -// -// test ("test text file read") { -// val partitionPaths = Seq[String]( -// "hdfs://localhost/griffin/streaming/dump/source/418010/25080625/1504837518000", -// "hdfs://localhost/griffin/streaming/dump/target/418010/25080625/1504837518000") -// val df = sqlContext.read.json(partitionPaths: _*) -// df.printSchema() -// df.show(10) -// } -// -// test ("list paths") { -// val filePath = "hdfs://localhost/griffin/streaming/dump/source" -// val partitionRanges = List[(Long, Long)]((0, 0), (-2, 0)) -// val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges) -// println(partitionPaths) -// } -// -// private def listPathsBetweenRanges(paths: List[String], -// partitionRanges: List[(Long, Long)] -// ): List[String] = { -// partitionRanges match { -// case Nil => paths -// case head :: tail => { -// val (lb, ub) = head -// val curPaths = paths.flatMap { path => -// val names = HdfsUtil.listSubPathsByType(path, "dir").toList -// println(names) -// names.filter { name => -// str2Long(name) match { -// case Some(t) => (t >= lb) && (t <= ub) -// case _ => false -// } -// }.map(HdfsUtil.getHdfsFilePath(path, _)) -// } -// listPathsBetweenRanges(curPaths, tail) -// } -// } -// } -// -// private def str2Long(str: String): Option[Long] = { -// try { -// Some(str.toLong) -// } catch { -// case e: Throwable => None -// } -// } -//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala b/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala deleted file mode 100644 index 394917c..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala +++ /dev/null @@ -1,85 +0,0 @@ -package org.apache.griffin.measure.process - -import org.apache.griffin.measure.utils.JsonUtil -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} -import org.apache.spark.sql.execution.datasources.json.JSONOptions -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - - -case class JsonToStructs( -// schema: DataType, -// options: Map[String, String], - child: Expression) - extends UnaryExpression with CodegenFallback with ExpectsInputTypes { - override def nullable: Boolean = true - -// def this(schema: DataType, options: Map[String, String], child: Expression) = -// this(schema, options, child, None) - - // Used in `FunctionRegistry` -// def this(child: Expression, schema: Expression) = -// this( -// schema = JsonExprUtils.validateSchemaLiteral(schema), -// options = Map.empty[String, String], -// child = child, -// timeZoneId = None) -// -// def this(child: Expression, schema: Expression, options: Expression) = -// this( -// schema = JsonExprUtils.validateSchemaLiteral(schema), -// options = JsonExprUtils.convertToMapData(options), -// child = child, -// timeZoneId = None) -// -// override def checkInputDataTypes(): TypeCheckResult = schema match { -// case _: StructType | ArrayType(_: StructType, _) => -// super.checkInputDataTypes() -// case _ => TypeCheckResult.TypeCheckFailure( -// s"Input schema ${schema.simpleString} must be a struct or an array of structs.") -// } - - override def dataType: DataType = MapType(StringType, StringType) - -// override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = -// copy(timeZoneId = Option(timeZoneId)) - - override def nullSafeEval(json: Any): Any = { - if (json.toString.trim.isEmpty) return null - - try { - JsonUtil.fromJson[Map[String, Any]](json.toString) - } catch { - case _: Throwable => null - } - } - - override def inputTypes: Seq[DataType] = StringType :: Nil -} -// -//object JsonExprUtils { -// -// def validateSchemaLiteral(exp: Expression): StructType = exp match { -// case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString) -// case e => throw new AnalysisException(s"Expected a string literal instead of $e") -// } -// -// def convertToMapData(exp: Expression): Map[String, String] = exp match { -// case m: CreateMap -// if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) => -// val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData] -// ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) => -// key.toString -> value.toString -// } -// case m: CreateMap => -// throw new AnalysisException( -// s"A type of keys and values in map() must be string, but got ${m.dataType}") -// case _ => -// throw new AnalysisException("Must use a map() function for options") -// } -//} \ No newline at end of file
