This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 190814e [SPARK-26550][SQL] New built-in datasource - noop 190814e is described below commit 190814e82eca3da450683685798d470712560a5d Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Wed Jan 16 19:01:58 2019 +0100 [SPARK-26550][SQL] New built-in datasource - noop ## What changes were proposed in this pull request? In the PR, I propose new built-in datasource with name `noop` which can be used in: - benchmarking to avoid additional overhead of actions and unnecessary type conversions - caching of datasets/dataframes - producing other side effects as a consequence of row materialisations like uploading data to a IO caches. ## How was this patch tested? Added a test to check that datasource rows are materialised. Closes #23471 from MaxGekk/none-datasource. Lead-authored-by: Maxim Gekk <max.g...@gmail.com> Co-authored-by: Maxim Gekk <maxim.g...@databricks.com> Signed-off-by: Herman van Hovell <hvanhov...@databricks.com> --- ...org.apache.spark.sql.sources.DataSourceRegister | 1 + .../datasources/noop/NoopDataSource.scala | 66 ++++++++++++++++++++++ .../sql/execution/datasources/noop/NoopSuite.scala | 62 ++++++++++++++++++++ 3 files changed, 129 insertions(+) diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 1b37905..7cdfddc 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1,6 +1,7 @@ org.apache.spark.sql.execution.datasources.csv.CSVFileFormat org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider org.apache.spark.sql.execution.datasources.json.JsonFileFormat +org.apache.spark.sql.execution.datasources.noop.NoopDataSource org.apache.spark.sql.execution.datasources.orc.OrcFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala new file mode 100644 index 0000000..79e4c62 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.noop + +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.types.StructType + +/** + * This is no-op datasource. It does not do anything besides consuming its input. + * This can be useful for benchmarking or to cache data without any additional overhead. + */ +class NoopDataSource + extends DataSourceV2 + with TableProvider + with DataSourceRegister { + + override def shortName(): String = "noop" + override def getTable(options: DataSourceOptions): Table = NoopTable +} + +private[noop] object NoopTable extends Table with SupportsBatchWrite { + override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = NoopWriteBuilder + override def name(): String = "noop-table" + override def schema(): StructType = new StructType() +} + +private[noop] object NoopWriteBuilder extends WriteBuilder with SupportsSaveMode { + override def buildForBatch(): BatchWrite = NoopBatchWrite + override def mode(mode: SaveMode): WriteBuilder = this +} + +private[noop] object NoopBatchWrite extends BatchWrite { + override def createBatchWriterFactory(): DataWriterFactory = NoopWriterFactory + override def commit(messages: Array[WriterCommitMessage]): Unit = {} + override def abort(messages: Array[WriterCommitMessage]): Unit = {} +} + +private[noop] object NoopWriterFactory extends DataWriterFactory { + override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = NoopWriter +} + +private[noop] object NoopWriter extends DataWriter[InternalRow] { + override def write(record: InternalRow): Unit = {} + override def commit(): WriterCommitMessage = null + override def abort(): Unit = {} +} + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/noop/NoopSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/noop/NoopSuite.scala new file mode 100644 index 0000000..59de286 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/noop/NoopSuite.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.noop + +import org.apache.spark.sql.test.SharedSQLContext + +class NoopSuite extends SharedSQLContext { + import testImplicits._ + + test("materialisation of all rows") { + val numElems = 10 + val accum = spark.sparkContext.longAccumulator + spark.range(numElems) + .map { x => + accum.add(1) + x + } + .write + .format("noop") + .save() + assert(accum.value == numElems) + } + + test("read partitioned data") { + val numElems = 100 + withTempPath { dir => + val path = dir.getCanonicalPath + spark.range(numElems) + .select('id mod 10 as "key", 'id as "value") + .write + .partitionBy("key") + .parquet(path) + + val accum = spark.sparkContext.longAccumulator + spark.read + .parquet(path) + .as[(Long, Long)] + .map { x => + accum.add(1) + x + } + .write.format("noop").save() + assert(accum.value == numElems) + } + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org