This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 190814e  [SPARK-26550][SQL] New built-in datasource - noop
190814e is described below

commit 190814e82eca3da450683685798d470712560a5d
Author: Maxim Gekk <max.g...@gmail.com>
AuthorDate: Wed Jan 16 19:01:58 2019 +0100

    [SPARK-26550][SQL] New built-in datasource - noop
    
    ## What changes were proposed in this pull request?
    
    In the PR, I propose new built-in datasource with name `noop` which can be 
used in:
    - benchmarking to avoid additional overhead of actions and unnecessary type 
conversions
    - caching of datasets/dataframes
    - producing other side effects as a consequence of row materialisations 
like uploading data to a IO caches.
    
    ## How was this patch tested?
    
    Added a test to check that datasource rows are materialised.
    
    Closes #23471 from MaxGekk/none-datasource.
    
    Lead-authored-by: Maxim Gekk <max.g...@gmail.com>
    Co-authored-by: Maxim Gekk <maxim.g...@databricks.com>
    Signed-off-by: Herman van Hovell <hvanhov...@databricks.com>
---
 ...org.apache.spark.sql.sources.DataSourceRegister |  1 +
 .../datasources/noop/NoopDataSource.scala          | 66 ++++++++++++++++++++++
 .../sql/execution/datasources/noop/NoopSuite.scala | 62 ++++++++++++++++++++
 3 files changed, 129 insertions(+)

diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 1b37905..7cdfddc 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,6 +1,7 @@
 org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
 org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+org.apache.spark.sql.execution.datasources.noop.NoopDataSource
 org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 org.apache.spark.sql.execution.datasources.text.TextFileFormat
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
new file mode 100644
index 0000000..79e4c62
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.noop
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This is no-op datasource. It does not do anything besides consuming its 
input.
+ * This can be useful for benchmarking or to cache data without any additional 
overhead.
+ */
+class NoopDataSource
+  extends DataSourceV2
+  with TableProvider
+  with DataSourceRegister {
+
+  override def shortName(): String = "noop"
+  override def getTable(options: DataSourceOptions): Table = NoopTable
+}
+
+private[noop] object NoopTable extends Table with SupportsBatchWrite {
+  override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = 
NoopWriteBuilder
+  override def name(): String = "noop-table"
+  override def schema(): StructType = new StructType()
+}
+
+private[noop] object NoopWriteBuilder extends WriteBuilder with 
SupportsSaveMode {
+  override def buildForBatch(): BatchWrite = NoopBatchWrite
+  override def mode(mode: SaveMode): WriteBuilder = this
+}
+
+private[noop] object NoopBatchWrite extends BatchWrite {
+  override def createBatchWriterFactory(): DataWriterFactory = 
NoopWriterFactory
+  override def commit(messages: Array[WriterCommitMessage]): Unit = {}
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+}
+
+private[noop] object NoopWriterFactory extends DataWriterFactory {
+  override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[InternalRow] = NoopWriter
+}
+
+private[noop] object NoopWriter extends DataWriter[InternalRow] {
+  override def write(record: InternalRow): Unit = {}
+  override def commit(): WriterCommitMessage = null
+  override def abort(): Unit = {}
+}
+
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/noop/NoopSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/noop/NoopSuite.scala
new file mode 100644
index 0000000..59de286
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/noop/NoopSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.noop
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class NoopSuite extends SharedSQLContext {
+  import testImplicits._
+
+  test("materialisation of all rows") {
+    val numElems = 10
+    val accum = spark.sparkContext.longAccumulator
+    spark.range(numElems)
+      .map { x =>
+        accum.add(1)
+        x
+      }
+      .write
+      .format("noop")
+      .save()
+    assert(accum.value == numElems)
+  }
+
+  test("read partitioned data") {
+    val numElems = 100
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      spark.range(numElems)
+        .select('id mod 10 as "key", 'id as "value")
+        .write
+        .partitionBy("key")
+        .parquet(path)
+
+      val accum = spark.sparkContext.longAccumulator
+      spark.read
+        .parquet(path)
+        .as[(Long, Long)]
+        .map { x =>
+          accum.add(1)
+          x
+        }
+        .write.format("noop").save()
+      assert(accum.value == numElems)
+    }
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to