This is an automated email from the ASF dual-hosted git repository. guoyp pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/griffin.git
View the commit online: https://github.com/apache/griffin/commit/48f304dc316ed380fef2c5c7891f7ec41e0ddd30 The following commit(s) were added to refs/heads/master by this push: new 48f304d [GRIFFIN-297] Allow support for additional file based data sources 48f304d is described below commit 48f304dc316ed380fef2c5c7891f7ec41e0ddd30 Author: chitralverma <[email protected]> AuthorDate: Thu Nov 21 09:26:44 2019 +0800 [GRIFFIN-297] Allow support for additional file based data sources **What changes were proposed in this pull request?** The PR extends the current support beyond just Avro and Text for various file based data sources (Parquet, ORC, etc). - Allows users to specify additional file based data sources like Parquet, CSV, TSV, ORC etc. - Allows data to be read directly from stand-alone files as well as directories present in both local/ distributed file systems. - Allows users to specify schema directly through options (useful for CSV/ TSV types). A sample config looks like, ``` { "name": "source", "baseline": true, "connectors": [ { "type": "file", "version": "1.7", "config": { "format": "parquet", "options": { "k1": "v1", "k2": "v2" }, "paths": [ "/home/chitral/path/to/source/", "/home/chitral/path/to/test.parquet" ] } } ] } ``` **Does this PR introduce any user-facing change?** No **How was this patch tested?** Griffin test suite. Some additional unit test has also been added. Author: chitralverma <[email protected]> Closes #555 from chitralverma/allow_file_based_batch_connectors. --- .../connector/DataConnectorFactory.scala | 2 + .../connector/batch/FileBasedDataConnector.scala | 209 +++++++++++++++++++ .../batch/FileBasedDataConnectorTest.scala | 223 +++++++++++++++++++++ 3 files changed, 434 insertions(+) diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala index a1ef3ba..371fb7b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala @@ -35,6 +35,7 @@ object DataConnectorFactory extends Loggable { val HiveRegex = """^(?i)hive$""".r val AvroRegex = """^(?i)avro$""".r + val FileRegex = """^(?i)file$""".r val TextDirRegex = """^(?i)text-dir$""".r val KafkaRegex = """^(?i)kafka$""".r @@ -62,6 +63,7 @@ object DataConnectorFactory extends Loggable { conType match { case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache) case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache) + case FileRegex() => FileBasedDataConnector(sparkSession, dcParam, tmstCache) case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache) case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case KafkaRegex() => diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala new file mode 100644 index 0000000..f0a000c --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala @@ -0,0 +1,209 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + */ +package org.apache.griffin.measure.datasource.connector.batch + +import scala.collection.mutable.{Map => MutableMap} +import scala.util.{Success, Try} + +import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam +import org.apache.griffin.measure.context.TimeRange +import org.apache.griffin.measure.datasource.TimestampStorage +import org.apache.griffin.measure.utils.HdfsUtil +import org.apache.griffin.measure.utils.ParamUtil._ + +/** + * A batch data connector for file based sources which allows support various + * file based data sources like Parquet, CSV, TSV, ORC etc. + * Local files can also be read by prepending `file://` namespace. + * + * Currently supported formats like Parquet, ORC, AVRO, Text and Delimited types like CSV, TSV etc. + * + * Supported Configurations: + * - format : [[String]] specifying the type of file source (parquet, orc, etc.). + * - paths : [[Seq]] specifying the paths to be read + * - options : [[Map]] of format specific options + * - skipOnError : [[Boolean]] specifying where to continue execution if one or more paths are invalid. + * - schema : [[Seq]] of {colName, colType and isNullable} given as key value pairs. If provided, this can + * help skip the schema inference step for some underlying data sources. + * + * Some defaults assumed by this connector (if not set) are as follows: + * - `delimiter` is \t for TSV format, + * - `schema` is None, + * - `header` is false, + * - `format` is parquet + */ +case class FileBasedDataConnector(@transient sparkSession: SparkSession, + dcParam: DataConnectorParam, + timestampStorage: TimestampStorage) + extends BatchDataConnector { + + import FileBasedDataConnector._ + + val config: Map[String, Any] = dcParam.getConfig + var options: MutableMap[String, String] = MutableMap(config.getParamStringMap(Options, Map.empty).toSeq: _*) + + var format: String = config.getString(Format, DefaultFormat).toLowerCase + val paths: Seq[String] = config.getStringArr(Paths, Nil) + val schemaSeq: Seq[Map[String, String]] = config.getAnyRef[Seq[Map[String, String]]](Schema, Nil) + val skipErrorPaths: Boolean = config.getBoolean(SkipErrorPaths, defValue = false) + + val currentSchema: Option[StructType] = Try(getUserDefinedSchema) match { + case Success(structType) if structType.fields.nonEmpty => Some(structType) + case _ => None + } + + assert(SupportedFormats.contains(format), + s"Invalid format '$format' specified. Must be one of ${SupportedFormats.mkString("['", "', '", "']")}") + + if (format == "csv") validateCSVOptions() + if (format == "tsv") { + format = "csv" + options.getOrElseUpdate(Delimiter, TabDelimiter) + } + + /** + * Builds a [[StructType]] from the given schema string provided as `Schema` config. + * + * @example + * {"schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]} + * {"schema":[{"name":"user_id","type":"decimal(5,2)","nullable":"true"}]} + * {"schema":[{"name":"my_struct","type":"struct<f1:int,f2:string>","nullable":"true"}]} + * @return + */ + private def getUserDefinedSchema: StructType = { + schemaSeq.foldLeft(new StructType())((currentStruct, fieldMap) => { + val colName = fieldMap(ColName).toLowerCase + val colType = fieldMap(ColType).toLowerCase + val isNullable = Try(fieldMap(IsNullable).toLowerCase.toBoolean).getOrElse(true) + + currentStruct.add(colName, colType, isNullable) + }) + } + + /** + * Ensures the presence of schema either via `header` or `schema` options. + * + * - If both are present, the preference will be given to `schema`. First row will be omitted + * if `header` is set to true, else will be included. + * - If `schema` is defined, it must be valid. + * - If neither is set, a fatal exception is thrown. + */ + private def validateCSVOptions(): Unit = { + if (options.contains(Header) && config.contains(Schema)) { + griffinLogger.warn(s"Both $Options.$Header and $Schema were provided. Defaulting to provided $Schema") + } + + if (!options.contains(Header) && !config.contains(Schema)) { + throw new IllegalArgumentException(s"Either '$Header' must be set in '$Options' or '$Schema' must be set.") + } + + if (config.contains(Schema) && (schemaSeq.isEmpty || currentSchema.isEmpty)) { + throw new IllegalStateException("Unable to create schema from specification") + + } + } + + def data(ms: Long): (Option[DataFrame], TimeRange) = { + val validPaths = getValidPaths(paths, skipErrorPaths) + + val dfOpt = { + val dfOpt = Some( + sparkSession.read + .options(options) + .format(format) + .withSchemaIfAny(currentSchema) + .load(validPaths: _*) + + ) + val preDfOpt = preProcess(dfOpt, ms) + preDfOpt + } + + (dfOpt, TimeRange(ms, readTmst(ms))) + } +} + +object FileBasedDataConnector extends Loggable { + private val Format: String = "format" + private val Paths: String = "paths" + private val Options: String = "options" + private val SkipErrorPaths: String = "skipErrorPaths" + private val Schema: String = "schema" + private val Header: String = "header" + private val Delimiter: String = "delimiter" + + private val ColName: String = "name" + private val ColType: String = "type" + private val IsNullable: String = "nullable" + private val TabDelimiter: String = "\t" + + private val DefaultFormat: String = SQLConf.DEFAULT_DATA_SOURCE_NAME.defaultValueString + private val SupportedFormats: Seq[String] = Seq("parquet", "orc", "avro", "text", "csv", "tsv") + + /** + * Validates the existence of paths in a given sequence. + * Set option `skipOnError` to true to avoid fatal errors if any erroneous paths are encountered. + * + * @param paths given sequence of paths + * @param skipOnError flag to skip erroneous paths if any + * @return + */ + private def getValidPaths(paths: Seq[String], skipOnError: Boolean): Seq[String] = { + val validPaths = paths.filter(path => + if (HdfsUtil.existPath(path)) true + else { + val msg = s"Path '$path' does not exist!" + if (skipOnError) griffinLogger.error(msg) + else throw new IllegalArgumentException(msg) + + false + } + ) + + assert(validPaths.nonEmpty, "No paths were given for the data source.") + validPaths + } + + /** + * Adds methods implicitly to [[DataFrameReader]] + * + * @param dfr an instance of [[DataFrameReader]] + */ + implicit class Implicits(dfr: DataFrameReader) { + + /** + * Applies a schema to this [[DataFrameReader]] if any. + * + * @param schemaOpt an optional Schema + * @return + */ + def withSchemaIfAny(schemaOpt: Option[StructType]): DataFrameReader = { + schemaOpt match { + case Some(structType) => dfr.schema(structType) + case None => dfr + } + } + } + +} diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala new file mode 100644 index 0000000..bf881bb --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala @@ -0,0 +1,223 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.datasource.connector.batch + +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} +import org.scalatest._ + +import org.apache.griffin.measure.SparkSuiteBase +import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam +import org.apache.griffin.measure.datasource.TimestampStorage +import org.apache.griffin.measure.step.builder.ConstantColumns + +class FileBasedDataConnectorTest extends SparkSuiteBase with Matchers { + + override def beforeAll(): Unit = { + super.beforeAll() + + createDataSets(s"file://${getClass.getResource("/").getPath}") + } + + private def createDataSets(basePath: String): Unit = { + val formats = Seq("parquet", "orc", "csv", "tsv") + val schema = new StructType().add("name", StringType).add("age", IntegerType, nullable = true) + + val df = spark.read.schema(schema).csv(s"${basePath}hive/person_table.csv") + + df.cache() + formats.foreach(f => { + val delimiter = if (f.matches("csv")) "," else if (f.matches("tsv")) "\t" else "" + df.write + .mode(SaveMode.Overwrite) + .option("delimiter", delimiter) + .option("header", "true") + .format(if (f.matches("tsv")) "csv" else f) + .save(s"${basePath}files/person_table.$f") + }) + + df.unpersist() + } + + private final val dcParam = DataConnectorParam("file", "1", "test_df", Map.empty[String, String], Nil) + private final val timestampStorage = TimestampStorage() + + // Regarding Local FileSystem + + "file based data connector" should "be able to read from local filesystem" in { + val configs = Map( + "format" -> "csv", + "paths" -> Seq( + s"file://${getClass.getResource("/hive/person_table.csv").getPath}" + ), + "options" -> Map( + "header" -> "false" + ) + ) + + val dc = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage) + val result = dc.data(1000L) + + assert(result._1.isDefined) + assert(result._1.get.collect().length == 2) + } + + // Regarding User Defined Schema + + it should "respect the provided schema, if any" in { + val configs = Map( + "format" -> "csv", + "paths" -> Seq( + s"file://${getClass.getResource("/hive/person_table.csv").getPath}" + ) + ) + + // no schema + assertThrows[IllegalArgumentException]( + FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage) + ) + + // invalid schema + assertThrows[IllegalStateException]( + FileBasedDataConnector(spark, dcParam.copy(config = configs + (("schema", ""))), timestampStorage) + ) + + // valid schema + val result1 = FileBasedDataConnector(spark, + dcParam.copy(config = configs + (("schema", + Seq(Map("name" -> "name", "type" -> "string"), Map("name" -> "age", "type" -> "int", "nullable" -> "true")) + ))) + , timestampStorage) + .data(1L) + + val expSchema = new StructType() + .add("name", StringType) + .add("age", IntegerType, nullable = true) + .add(ConstantColumns.tmst, LongType, nullable = false) + + assert(result1._1.isDefined) + assert(result1._1.get.collect().length == 2) + assert(result1._1.get.schema == expSchema) + + // valid headers + val result2 = FileBasedDataConnector(spark, + dcParam.copy(config = configs + (("options", Map( + "header" -> "true" + ) + ))) + , timestampStorage) + .data(1L) + + assert(result2._1.isDefined) + assert(result2._1.get.collect().length == 1) + result2._1.get.columns should contain theSameElementsAs Seq("Joey", "14", ConstantColumns.tmst) + } + + // skip on erroneous paths + + it should "respect options if an erroneous path is encountered" in { + val configs = Map( + "format" -> "csv", + "paths" -> Seq( + s"file://${getClass.getResource("/hive/person_table.csv").getPath}", + s"${java.util.UUID.randomUUID().toString}/" + ), + "skipErrorPaths" -> true, + "options" -> Map( + "header" -> "true" + ) + ) + + // valid paths + val result1 = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L) + + assert(result1._1.isDefined) + assert(result1._1.get.collect().length == 1) + + // non existent path + assertThrows[IllegalArgumentException]( + FileBasedDataConnector(spark, dcParam.copy(config = configs - "skipErrorPaths"), timestampStorage).data(1L) + ) + + // no path + assertThrows[AssertionError]( + FileBasedDataConnector(spark, dcParam.copy(config = configs - "paths"), timestampStorage).data(1L) + ) + } + + // Regarding various formats + it should "be able to read all supported file types" in { + + val formats = Seq("parquet", "orc", "csv", "tsv") + formats.map(f => { + val configs = Map( + "format" -> f, + "paths" -> Seq( + s"file://${getClass.getResource(s"/files/person_table.$f").getPath}" + ), + "options" -> Map( + "header" -> "true", + "inferSchema" -> "true" + ) + ) + + val result = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L) + + assert(result._1.isDefined) + + val df = result._1.get + val expSchema = new StructType() + .add("name", StringType) + .add("age", IntegerType, nullable = true) + .add(ConstantColumns.tmst, LongType, nullable = false) + + assert(df.collect().length == 2) + assert(df.schema == expSchema) + }) + } + + it should "apply schema to all formats if provided" in { + val formats = Seq("parquet", "orc", "csv", "tsv") + formats.map(f => { + val configs = Map( + "format" -> f, + "paths" -> Seq( + s"file://${getClass.getResource(s"/files/person_table.$f").getPath}" + ), + "options" -> Map( + "header" -> "true" + ), + "schema" -> Seq(Map("name" -> "name", "type" -> "string")) + ) + + val result = FileBasedDataConnector(spark, dcParam.copy(config = configs), timestampStorage).data(1L) + + assert(result._1.isDefined) + + val df = result._1.get + val expSchema = new StructType() + .add("name", StringType) + .add(ConstantColumns.tmst, LongType, nullable = false) + + assert(df.collect().length == 2) + assert(df.schema == expSchema) + }) + } + +}
