This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


View the commit online:
https://github.com/apache/griffin/commit/48f304dc316ed380fef2c5c7891f7ec41e0ddd30

The following commit(s) were added to refs/heads/master by this push:
     new 48f304d  [GRIFFIN-297] Allow support for additional file based data 
sources
48f304d is described below

commit 48f304dc316ed380fef2c5c7891f7ec41e0ddd30
Author: chitralverma <[email protected]>
AuthorDate: Thu Nov 21 09:26:44 2019 +0800

    [GRIFFIN-297] Allow support for additional file based data sources
    
    **What changes were proposed in this pull request?**
    
    The PR extends the current support beyond just Avro and Text for various 
file based data sources (Parquet, ORC, etc).
    
     - Allows users to specify additional file based data sources like Parquet, 
CSV, TSV, ORC etc.
     - Allows data to be read directly from stand-alone files as well as 
directories present in both local/ distributed file systems.
     - Allows users to specify schema directly through options (useful for CSV/ 
TSV types).
    
    A sample config looks like,
    
    ```
    {
      "name": "source",
      "baseline": true,
      "connectors": [
        {
          "type": "file",
          "version": "1.7",
          "config": {
            "format": "parquet",
            "options": {
              "k1": "v1",
              "k2": "v2"
            },
            "paths": [
              "/home/chitral/path/to/source/",
              "/home/chitral/path/to/test.parquet"
            ]
          }
        }
      ]
    }
    
    ```
    **Does this PR introduce any user-facing change?**
    No
    
    **How was this patch tested?**
    Griffin test suite. Some additional unit test has also been added.
    
    Author: chitralverma <[email protected]>
    
    Closes #555 from chitralverma/allow_file_based_batch_connectors.
---
 .../connector/DataConnectorFactory.scala           |   2 +
 .../connector/batch/FileBasedDataConnector.scala   | 209 +++++++++++++++++++
 .../batch/FileBasedDataConnectorTest.scala         | 223 +++++++++++++++++++++
 3 files changed, 434 insertions(+)

diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index a1ef3ba..371fb7b 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -35,6 +35,7 @@ object DataConnectorFactory extends Loggable {
 
   val HiveRegex = """^(?i)hive$""".r
   val AvroRegex = """^(?i)avro$""".r
+  val FileRegex = """^(?i)file$""".r
   val TextDirRegex = """^(?i)text-dir$""".r
 
   val KafkaRegex = """^(?i)kafka$""".r
@@ -62,6 +63,7 @@ object DataConnectorFactory extends Loggable {
       conType match {
         case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, 
tmstCache)
         case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, 
tmstCache)
+        case FileRegex() => FileBasedDataConnector(sparkSession, dcParam, 
tmstCache)
         case TextDirRegex() => TextDirBatchDataConnector(sparkSession, 
dcParam, tmstCache)
         case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, 
tmstCache, streamingCacheClientOpt)
         case KafkaRegex() =>
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
new file mode 100644
index 0000000..f0a000c
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
@@ -0,0 +1,209 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+ */
+package org.apache.griffin.measure.datasource.connector.batch
+
+import scala.collection.mutable.{Map => MutableMap}
+import scala.util.{Success, Try}
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+ * A batch data connector for file based sources which allows support various
+ * file based data sources like Parquet, CSV, TSV, ORC etc.
+ * Local files can also be read by prepending `file://` namespace.
+ *
+ * Currently supported formats like Parquet, ORC, AVRO, Text and Delimited 
types like CSV, TSV etc.
+ *
+ * Supported Configurations:
+ *  - format : [[String]] specifying the type of file source (parquet, orc, 
etc.).
+ *  - paths : [[Seq]] specifying the paths to be read
+ *  - options : [[Map]] of format specific options
+ *  - skipOnError : [[Boolean]] specifying where to continue execution if one 
or more paths are invalid.
+ *  - schema : [[Seq]] of {colName, colType and isNullable} given as key value 
pairs. If provided, this can
+ * help skip the schema inference step for some underlying data sources.
+ *
+ * Some defaults assumed by this connector (if not set) are as follows:
+ *  - `delimiter` is \t for TSV format,
+ *  - `schema` is None,
+ *  - `header` is false,
+ *  - `format` is parquet
+ */
+case class FileBasedDataConnector(@transient sparkSession: SparkSession,
+                                  dcParam: DataConnectorParam,
+                                  timestampStorage: TimestampStorage)
+  extends BatchDataConnector {
+
+  import FileBasedDataConnector._
+
+  val config: Map[String, Any] = dcParam.getConfig
+  var options: MutableMap[String, String] = 
MutableMap(config.getParamStringMap(Options, Map.empty).toSeq: _*)
+
+  var format: String = config.getString(Format, DefaultFormat).toLowerCase
+  val paths: Seq[String] = config.getStringArr(Paths, Nil)
+  val schemaSeq: Seq[Map[String, String]] = config.getAnyRef[Seq[Map[String, 
String]]](Schema, Nil)
+  val skipErrorPaths: Boolean = config.getBoolean(SkipErrorPaths, defValue = 
false)
+
+  val currentSchema: Option[StructType] = Try(getUserDefinedSchema) match {
+    case Success(structType) if structType.fields.nonEmpty => Some(structType)
+    case _ => None
+  }
+
+  assert(SupportedFormats.contains(format),
+    s"Invalid format '$format' specified. Must be one of 
${SupportedFormats.mkString("['", "', '", "']")}")
+
+  if (format == "csv") validateCSVOptions()
+  if (format == "tsv") {
+    format = "csv"
+    options.getOrElseUpdate(Delimiter, TabDelimiter)
+  }
+
+  /**
+   * Builds a [[StructType]] from the given schema string provided as `Schema` 
config.
+   *
+   * @example
+   * 
{"schema":[{"name":"user_id","type":"string","nullable":"true"},{"name":"age","type":"int","nullable":"false"}]}
+   * {"schema":[{"name":"user_id","type":"decimal(5,2)","nullable":"true"}]}
+   * 
{"schema":[{"name":"my_struct","type":"struct<f1:int,f2:string>","nullable":"true"}]}
+   * @return
+   */
+  private def getUserDefinedSchema: StructType = {
+    schemaSeq.foldLeft(new StructType())((currentStruct, fieldMap) => {
+      val colName = fieldMap(ColName).toLowerCase
+      val colType = fieldMap(ColType).toLowerCase
+      val isNullable = 
Try(fieldMap(IsNullable).toLowerCase.toBoolean).getOrElse(true)
+
+      currentStruct.add(colName, colType, isNullable)
+    })
+  }
+
+  /**
+   * Ensures the presence of schema either via `header` or `schema` options.
+   *
+   *  - If both are present, the preference will be given to `schema`. First 
row will be omitted
+   * if `header` is set to true, else will be included.
+   *  - If `schema` is defined, it must be valid.
+   *  - If neither is set, a fatal exception is thrown.
+   */
+  private def validateCSVOptions(): Unit = {
+    if (options.contains(Header) && config.contains(Schema)) {
+      griffinLogger.warn(s"Both $Options.$Header and $Schema were provided. 
Defaulting to provided $Schema")
+    }
+
+    if (!options.contains(Header) && !config.contains(Schema)) {
+      throw new IllegalArgumentException(s"Either '$Header' must be set in 
'$Options' or '$Schema' must be set.")
+    }
+
+    if (config.contains(Schema) && (schemaSeq.isEmpty || 
currentSchema.isEmpty)) {
+      throw new IllegalStateException("Unable to create schema from 
specification")
+
+    }
+  }
+
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
+    val validPaths = getValidPaths(paths, skipErrorPaths)
+
+    val dfOpt = {
+      val dfOpt = Some(
+        sparkSession.read
+          .options(options)
+          .format(format)
+          .withSchemaIfAny(currentSchema)
+          .load(validPaths: _*)
+
+      )
+      val preDfOpt = preProcess(dfOpt, ms)
+      preDfOpt
+    }
+
+    (dfOpt, TimeRange(ms, readTmst(ms)))
+  }
+}
+
+object FileBasedDataConnector extends Loggable {
+  private val Format: String = "format"
+  private val Paths: String = "paths"
+  private val Options: String = "options"
+  private val SkipErrorPaths: String = "skipErrorPaths"
+  private val Schema: String = "schema"
+  private val Header: String = "header"
+  private val Delimiter: String = "delimiter"
+
+  private val ColName: String = "name"
+  private val ColType: String = "type"
+  private val IsNullable: String = "nullable"
+  private val TabDelimiter: String = "\t"
+
+  private val DefaultFormat: String = 
SQLConf.DEFAULT_DATA_SOURCE_NAME.defaultValueString
+  private val SupportedFormats: Seq[String] = Seq("parquet", "orc", "avro", 
"text", "csv", "tsv")
+
+  /**
+   * Validates the existence of paths in a given sequence.
+   * Set option `skipOnError` to true to avoid fatal errors if any erroneous 
paths are encountered.
+   *
+   * @param paths       given sequence of paths
+   * @param skipOnError flag to skip erroneous paths if any
+   * @return
+   */
+  private def getValidPaths(paths: Seq[String], skipOnError: Boolean): 
Seq[String] = {
+    val validPaths = paths.filter(path =>
+      if (HdfsUtil.existPath(path)) true
+      else {
+        val msg = s"Path '$path' does not exist!"
+        if (skipOnError) griffinLogger.error(msg)
+        else throw new IllegalArgumentException(msg)
+
+        false
+      }
+    )
+
+    assert(validPaths.nonEmpty, "No paths were given for the data source.")
+    validPaths
+  }
+
+  /**
+   * Adds methods implicitly to [[DataFrameReader]]
+   *
+   * @param dfr an instance of [[DataFrameReader]]
+   */
+  implicit class Implicits(dfr: DataFrameReader) {
+
+    /**
+     * Applies a schema to this [[DataFrameReader]] if any.
+     *
+     * @param schemaOpt an optional Schema
+     * @return
+     */
+    def withSchemaIfAny(schemaOpt: Option[StructType]): DataFrameReader = {
+      schemaOpt match {
+        case Some(structType) => dfr.schema(structType)
+        case None => dfr
+      }
+    }
+  }
+
+}
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
new file mode 100644
index 0000000..bf881bb
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
@@ -0,0 +1,223 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructType}
+import org.scalatest._
+
+import org.apache.griffin.measure.SparkSuiteBase
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.step.builder.ConstantColumns
+
+class FileBasedDataConnectorTest extends SparkSuiteBase with Matchers {
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    createDataSets(s"file://${getClass.getResource("/").getPath}")
+  }
+
+  private def createDataSets(basePath: String): Unit = {
+    val formats = Seq("parquet", "orc", "csv", "tsv")
+    val schema = new StructType().add("name", StringType).add("age", 
IntegerType, nullable = true)
+
+    val df = spark.read.schema(schema).csv(s"${basePath}hive/person_table.csv")
+
+    df.cache()
+    formats.foreach(f => {
+      val delimiter = if (f.matches("csv")) "," else if (f.matches("tsv")) 
"\t" else ""
+      df.write
+        .mode(SaveMode.Overwrite)
+        .option("delimiter", delimiter)
+        .option("header", "true")
+        .format(if (f.matches("tsv")) "csv" else f)
+        .save(s"${basePath}files/person_table.$f")
+    })
+
+    df.unpersist()
+  }
+
+  private final val dcParam = DataConnectorParam("file", "1", "test_df", 
Map.empty[String, String], Nil)
+  private final val timestampStorage = TimestampStorage()
+
+  // Regarding Local FileSystem
+
+  "file based data connector" should "be able to read from local filesystem" 
in {
+    val configs = Map(
+      "format" -> "csv",
+      "paths" -> Seq(
+        s"file://${getClass.getResource("/hive/person_table.csv").getPath}"
+      ),
+      "options" -> Map(
+        "header" -> "false"
+      )
+    )
+
+    val dc = FileBasedDataConnector(spark, dcParam.copy(config = configs), 
timestampStorage)
+    val result = dc.data(1000L)
+
+    assert(result._1.isDefined)
+    assert(result._1.get.collect().length == 2)
+  }
+
+  // Regarding User Defined Schema
+
+  it should "respect the provided schema, if any" in {
+    val configs = Map(
+      "format" -> "csv",
+      "paths" -> Seq(
+        s"file://${getClass.getResource("/hive/person_table.csv").getPath}"
+      )
+    )
+
+    // no schema
+    assertThrows[IllegalArgumentException](
+      FileBasedDataConnector(spark, dcParam.copy(config = configs), 
timestampStorage)
+    )
+
+    // invalid schema
+    assertThrows[IllegalStateException](
+      FileBasedDataConnector(spark, dcParam.copy(config = configs + 
(("schema", ""))), timestampStorage)
+    )
+
+    // valid schema
+    val result1 = FileBasedDataConnector(spark,
+      dcParam.copy(config = configs + (("schema",
+        Seq(Map("name" -> "name", "type" -> "string"), Map("name" -> "age", 
"type" -> "int", "nullable" -> "true"))
+      )))
+      , timestampStorage)
+      .data(1L)
+
+    val expSchema = new StructType()
+      .add("name", StringType)
+      .add("age", IntegerType, nullable = true)
+      .add(ConstantColumns.tmst, LongType, nullable = false)
+
+    assert(result1._1.isDefined)
+    assert(result1._1.get.collect().length == 2)
+    assert(result1._1.get.schema == expSchema)
+
+    // valid headers
+    val result2 = FileBasedDataConnector(spark,
+      dcParam.copy(config = configs + (("options", Map(
+        "header" -> "true"
+      )
+      )))
+      , timestampStorage)
+      .data(1L)
+
+    assert(result2._1.isDefined)
+    assert(result2._1.get.collect().length == 1)
+    result2._1.get.columns should contain theSameElementsAs Seq("Joey", "14", 
ConstantColumns.tmst)
+  }
+
+  // skip on erroneous paths
+
+  it should "respect options if an erroneous path is encountered" in {
+    val configs = Map(
+      "format" -> "csv",
+      "paths" -> Seq(
+        s"file://${getClass.getResource("/hive/person_table.csv").getPath}",
+        s"${java.util.UUID.randomUUID().toString}/"
+      ),
+      "skipErrorPaths" -> true,
+      "options" -> Map(
+        "header" -> "true"
+      )
+    )
+
+    // valid paths
+    val result1 = FileBasedDataConnector(spark, dcParam.copy(config = 
configs), timestampStorage).data(1L)
+
+    assert(result1._1.isDefined)
+    assert(result1._1.get.collect().length == 1)
+
+    // non existent path
+    assertThrows[IllegalArgumentException](
+      FileBasedDataConnector(spark, dcParam.copy(config = configs - 
"skipErrorPaths"), timestampStorage).data(1L)
+    )
+
+    // no path
+    assertThrows[AssertionError](
+      FileBasedDataConnector(spark, dcParam.copy(config = configs - "paths"), 
timestampStorage).data(1L)
+    )
+  }
+
+  // Regarding various formats
+  it should "be able to read all supported file types" in {
+
+    val formats = Seq("parquet", "orc", "csv", "tsv")
+    formats.map(f => {
+      val configs = Map(
+        "format" -> f,
+        "paths" -> Seq(
+          s"file://${getClass.getResource(s"/files/person_table.$f").getPath}"
+        ),
+        "options" -> Map(
+          "header" -> "true",
+          "inferSchema" -> "true"
+        )
+      )
+
+      val result = FileBasedDataConnector(spark, dcParam.copy(config = 
configs), timestampStorage).data(1L)
+
+      assert(result._1.isDefined)
+
+      val df = result._1.get
+      val expSchema = new StructType()
+        .add("name", StringType)
+        .add("age", IntegerType, nullable = true)
+        .add(ConstantColumns.tmst, LongType, nullable = false)
+
+      assert(df.collect().length == 2)
+      assert(df.schema == expSchema)
+    })
+  }
+
+  it should "apply schema to all formats if provided" in {
+    val formats = Seq("parquet", "orc", "csv", "tsv")
+    formats.map(f => {
+      val configs = Map(
+        "format" -> f,
+        "paths" -> Seq(
+          s"file://${getClass.getResource(s"/files/person_table.$f").getPath}"
+        ),
+        "options" -> Map(
+          "header" -> "true"
+        ),
+        "schema" -> Seq(Map("name" -> "name", "type" -> "string"))
+      )
+
+      val result = FileBasedDataConnector(spark, dcParam.copy(config = 
configs), timestampStorage).data(1L)
+
+      assert(result._1.isDefined)
+
+      val df = result._1.get
+      val expSchema = new StructType()
+        .add("name", StringType)
+        .add(ConstantColumns.tmst, LongType, nullable = false)
+
+      assert(df.collect().length == 2)
+      assert(df.schema == expSchema)
+    })
+  }
+
+}

Reply via email to