[
https://issues.apache.org/jira/browse/GRIFFIN-297?focusedWorklogId=345294&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345294
]
ASF GitHub Bot logged work on GRIFFIN-297:
------------------------------------------
Author: ASF GitHub Bot
Created on: 18/Nov/19 13:46
Start Date: 18/Nov/19 13:46
Worklog Time Spent: 10m
Work Description: chitralverma commented on pull request #555: [WIP]
[GRIFFIN-297] Allow support for additional file based data sources
URL: https://github.com/apache/griffin/pull/555#discussion_r347387803
##########
File path:
measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
##########
@@ -0,0 +1,199 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType,
StructType}
+import org.scalatest._
+
+import org.apache.griffin.measure.SparkSuiteBase
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.step.builder.ConstantColumns
+
+class FileBasedDataConnectorTest extends SparkSuiteBase with Matchers {
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ createDataSets(s"file://${getClass.getResource("/").getPath}")
+ }
+
+ private def createDataSets(basePath: String): Unit = {
+ val formats = Seq("parquet", "orc", "csv", "tsv")
+ val schema = new StructType().add("name", StringType).add("age",
IntegerType, nullable = true)
+
+ val df = spark.read.schema(schema).csv(s"${basePath}hive/person_table.csv")
+
+ df.cache()
+ formats.foreach(f => {
+ val delimiter = if (f.matches("csv")) "," else if (f.matches("tsv"))
"\t" else ""
+ df.write
+ .mode(SaveMode.Overwrite)
+ .option("delimiter", delimiter)
+ .option("header", "true")
+ .format(if (f.matches("tsv")) "csv" else f)
+ .save(s"${basePath}files/person_table.$f")
+ })
+
+ df.unpersist()
+ }
+
+ private final val dcParam = DataConnectorParam("file", "1", "test_df",
Map.empty[String, String], Nil)
+ private final val timestampStorage = TimestampStorage()
+
+ // Regarding Local FileSystem
+
+ "file based data connector" should "be able to read from local filesystem"
in {
+ val configs = Map(
+ "format" -> "csv",
+ "paths" -> Seq(
+ s"file://${getClass.getResource("/hive/person_table.csv").getPath}"
+ ),
+ "options" -> Map(
+ "header" -> "false"
+ )
+ )
+
+ val dc = FileBasedDataConnector(spark, dcParam.copy(config = configs),
timestampStorage)
+ val result = dc.data(1000L)
+
+ assert(result._1.isDefined)
+ assert(result._1.get.collect().length == 2)
+ }
+
+ // Regarding User Defined Schema
+
+ it should "respect the provided schema, if any" in {
+ val configs = Map(
+ "format" -> "csv",
+ "paths" -> Seq(
+ s"file://${getClass.getResource("/hive/person_table.csv").getPath}"
+ )
+ )
+
+ // no schema
+ assertThrows[IllegalArgumentException](
+ FileBasedDataConnector(spark, dcParam.copy(config = configs),
timestampStorage)
+ )
+
+ // invalid schema
+ assertThrows[IllegalStateException](
+ FileBasedDataConnector(spark, dcParam.copy(config = configs +
(("schema", ""))), timestampStorage)
+ )
+
+ // valid schema
+ val result1 = FileBasedDataConnector(spark,
+ dcParam.copy(config = configs + (("schema",
+ Seq(Map("name" -> "name", "type" -> "string"), Map("name" -> "age",
"type" -> "int", "nullable" -> "true"))
+ )))
+ , timestampStorage)
+ .data(1L)
+
+ val expSchema = new StructType()
+ .add("name", StringType)
+ .add("age", IntegerType, nullable = true)
+ .add(ConstantColumns.tmst, LongType, nullable = false)
+
+ assert(result1._1.isDefined)
+ assert(result1._1.get.collect().length == 2)
+ assert(result1._1.get.schema == expSchema)
+
+ // valid headers
+ val result2 = FileBasedDataConnector(spark,
+ dcParam.copy(config = configs + (("options", Map(
+ "header" -> "true"
+ )
+ )))
+ , timestampStorage)
+ .data(1L)
+
+ assert(result2._1.isDefined)
+ assert(result2._1.get.collect().length == 1)
+ result2._1.get.columns should contain theSameElementsAs Seq("Joey", "14",
ConstantColumns.tmst)
+ }
+
+ // skip on erroneous paths
+
+ it should "respect options if an erroneous path is encountered" in {
+ val configs = Map(
+ "format" -> "csv",
+ "paths" -> Seq(
+ s"file://${getClass.getResource("/hive/person_table.csv").getPath}",
+ s"${java.util.UUID.randomUUID().toString}/"
+ ),
+ "skipErrorPaths" -> true,
+ "options" -> Map(
+ "header" -> "true"
+ )
+ )
+
+ // valid paths
+ val result1 = FileBasedDataConnector(spark, dcParam.copy(config =
configs), timestampStorage).data(1L)
+
+ assert(result1._1.isDefined)
+ assert(result1._1.get.collect().length == 1)
+
+ // non existent path
+ assertThrows[IllegalArgumentException](
+ FileBasedDataConnector(spark, dcParam.copy(config = configs -
"skipErrorPaths"), timestampStorage).data(1L)
+ )
+
+ // no path
+ assertThrows[AssertionError](
+ FileBasedDataConnector(spark, dcParam.copy(config = configs - "paths"),
timestampStorage).data(1L)
+ )
+ }
+
+ // Regarding various formats
+ it should "be able to read all supported file types" in {
+
+ val formats = Seq("parquet", "orc", "csv", "tsv")
+ formats.map(f => {
+ val delimiter = if (f.matches("csv")) "," else if (f.matches("tsv"))
"\t" else ""
+ val configs = Map(
+ "format" -> f,
+ "paths" -> Seq(
+ s"file://${getClass.getResource(s"/files/person_table.$f").getPath}"
+ ),
+ "options" -> Map(
+ "header" -> "true",
+ "inferSchema" -> "true",
+ "delimiter" -> delimiter
Review comment:
Added following defaults:
- default delimiter for TSV format as `\t`,
- default `schema` is `None`,
- `header` is assumed false if not set,
- default `format` is `parquet`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 345294)
Time Spent: 3.5h (was: 3h 20m)
> Allow support for additional file based data sources
> ----------------------------------------------------
>
> Key: GRIFFIN-297
> URL: https://issues.apache.org/jira/browse/GRIFFIN-297
> Project: Griffin
> Issue Type: Improvement
> Reporter: Chitral Verma
> Priority: Major
> Labels: features
> Time Spent: 3.5h
> Remaining Estimate: 0h
>
> In the current version of Apache griffin (0.5.0), there is very limited
> support for file based data sources as only Avro and Text files are
> supported.
> I propose the feature to allow support for additional file based data sources
> like Parquet, CSV, TSV, ORC etc in batch mode. Since most of the above
> sources already have first class support provided by spark, the
> implementation is straight forward.
> Also, this feature will allow data to be read directly from stand alone files
> as well as directories present in both local and distributed filesystems.
> A sample config would look like,
> {noformat}
> {
> "name": "source",
> "baseline": true,
> "connectors": [
> {
> "type": "file",
> "version": "1.7",
> "config": {
> "format": "parquet",
> "options": {
> "k1": "v1",
> "k2": "v2"
> },
> "paths": [
> "/home/chitral/path/to/source/",
> "/home/chitral/path/to/test.parquet"
> ]
> }
> }
> ]
> }{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)