This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d50603a [SPARK-27271][SQL] Migrate Text to File Data Source V2
d50603a is described below
commit d50603a37c478b299e5bd5c1e04cf2f65e108d1e
Author: Gengliang Wang <[email protected]>
AuthorDate: Mon Apr 8 10:15:22 2019 -0700
[SPARK-27271][SQL] Migrate Text to File Data Source V2
## What changes were proposed in this pull request?
Migrate Text source to File Data Source V2
## How was this patch tested?
Unit test
Closes #24207 from gengliangwang/textV2.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 2 +-
...org.apache.spark.sql.sources.DataSourceRegister | 2 +-
.../datasources/text/TextFileFormat.scala | 29 ---------
.../execution/datasources/text/TextOptions.scala | 2 +-
.../datasources/text/TextOutputWriter.scala | 54 ++++++++++++++++
.../datasources/v2/text/TextDataSourceV2.scala | 44 +++++++++++++
.../v2/text/TextPartitionReaderFactory.scala | 73 ++++++++++++++++++++++
.../execution/datasources/v2/text/TextScan.scala | 61 ++++++++++++++++++
.../datasources/v2/text/TextScanBuilder.scala | 38 +++++++++++
.../execution/datasources/v2/text/TextTable.scala | 48 ++++++++++++++
.../datasources/v2/text/TextWriteBuilder.scala | 70 +++++++++++++++++++++
.../spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +-
12 files changed, 392 insertions(+), 33 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 71a49c2..157be1b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1488,7 +1488,7 @@ object SQLConf {
" register class names for which data source V2 write paths are
disabled. Writes from these" +
" sources will fall back to the V1 sources.")
.stringConf
- .createWithDefault("csv,orc")
+ .createWithDefault("csv,orc,text")
val DISABLED_V2_STREAMING_WRITERS =
buildConf("spark.sql.streaming.disabledV2Writers")
.doc("A comma-separated list of fully qualified data source register class
names for which" +
diff --git
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index b686187..be9cb81 100644
---
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -4,7 +4,7 @@ org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.noop.NoopDataSource
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-org.apache.spark.sql.execution.datasources.text.TextFileFormat
+org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index 60756e7..d8811c7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.datasources.text
-import java.io.OutputStream
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@@ -143,30 +141,3 @@ class TextFileFormat extends TextBasedFileFormat with
DataSourceRegister {
dataType == StringType
}
-class TextOutputWriter(
- path: String,
- dataSchema: StructType,
- lineSeparator: Array[Byte],
- context: TaskAttemptContext)
- extends OutputWriter {
-
- private var outputStream: Option[OutputStream] = None
-
- override def write(row: InternalRow): Unit = {
- val os = outputStream.getOrElse {
- val newStream = CodecStreams.createOutputStream(context, new Path(path))
- outputStream = Some(newStream)
- newStream
- }
-
- if (!row.isNullAt(0)) {
- val utf8string = row.getUTF8String(0)
- utf8string.writeTo(os)
- }
- os.write(lineSeparator)
- }
-
- override def close(): Unit = {
- outputStream.foreach(_.close())
- }
-}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
index e4e2019..ef13216 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
@@ -24,7 +24,7 @@ import
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
/**
* Options for the Text data source.
*/
-private[text] class TextOptions(@transient private val parameters:
CaseInsensitiveMap[String])
+class TextOptions(@transient private val parameters:
CaseInsensitiveMap[String])
extends Serializable {
import TextOptions._
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOutputWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOutputWriter.scala
new file mode 100644
index 0000000..faf6e57
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOutputWriter.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.text
+
+import java.io.OutputStream
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter}
+import org.apache.spark.sql.types.StructType
+
+class TextOutputWriter(
+ path: String,
+ dataSchema: StructType,
+ lineSeparator: Array[Byte],
+ context: TaskAttemptContext)
+ extends OutputWriter {
+
+ private var outputStream: Option[OutputStream] = None
+
+ override def write(row: InternalRow): Unit = {
+ val os = outputStream.getOrElse {
+ val newStream = CodecStreams.createOutputStream(context, new Path(path))
+ outputStream = Some(newStream)
+ newStream
+ }
+
+ if (!row.isNullAt(0)) {
+ val utf8string = row.getUTF8String(0)
+ utf8string.writeTo(os)
+ }
+ os.write(lineSeparator)
+ }
+
+ override def close(): Unit = {
+ outputStream.foreach(_.close())
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
new file mode 100644
index 0000000..f6aa1e9
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.text
+
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.sources.v2.Table
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class TextDataSourceV2 extends FileDataSourceV2 {
+
+ override def fallbackFileFormat: Class[_ <: FileFormat] =
classOf[TextFileFormat]
+
+ override def shortName(): String = "text"
+
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
+ val paths = getPaths(options)
+ val tableName = getTableName(paths)
+ TextTable(tableName, sparkSession, options, paths, None,
fallbackFileFormat)
+ }
+
+ override def getTable(options: CaseInsensitiveStringMap, schema:
StructType): Table = {
+ val paths = getPaths(options)
+ val tableName = getTableName(paths)
+ TextTable(tableName, sparkSession, options, paths, Some(schema),
fallbackFileFormat)
+ }
+}
+
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
new file mode 100644
index 0000000..8788887
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.text
+
+import org.apache.spark.TaskContext
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
+import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader,
HadoopFileWholeTextReader, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.text.TextOptions
+import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.PartitionReader
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A factory used to create Text readers.
+ *
+ * @param sqlConf SQL configuration.
+ * @param broadcastedConf Broadcasted serializable Hadoop Configuration.
+ * @param readDataSchema Required schema in the batch scan.
+ * @param partitionSchema Schema of partitions.
+ * @param textOptions Options for reading a text file.
+ * */
+case class TextPartitionReaderFactory(
+ sqlConf: SQLConf,
+ broadcastedConf: Broadcast[SerializableConfiguration],
+ readDataSchema: StructType,
+ partitionSchema: StructType,
+ textOptions: TextOptions) extends FilePartitionReaderFactory {
+
+ override def buildReader(file: PartitionedFile):
PartitionReader[InternalRow] = {
+ val confValue = broadcastedConf.value.value
+ val reader = if (!textOptions.wholeText) {
+ new HadoopFileLinesReader(file, textOptions.lineSeparatorInRead,
confValue)
+ } else {
+ new HadoopFileWholeTextReader(file, confValue)
+ }
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ =>
reader.close()))
+ val iter = if (readDataSchema.isEmpty) {
+ val emptyUnsafeRow = new UnsafeRow(0)
+ reader.map(_ => emptyUnsafeRow)
+ } else {
+ val unsafeRowWriter = new UnsafeRowWriter(1)
+
+ reader.map { line =>
+ // Writes to an UnsafeRow directly
+ unsafeRowWriter.reset()
+ unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
+ unsafeRowWriter.getRow()
+ }
+ }
+ val fileReader = new PartitionReaderFromIterator[InternalRow](iter)
+ new PartitionReaderWithPartitionValues(fileReader, readDataSchema,
+ partitionSchema, file.partitionValues)
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala
new file mode 100644
index 0000000..202723d
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.text
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.text.TextOptions
+import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
+import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.SerializableConfiguration
+
+case class TextScan(
+ sparkSession: SparkSession,
+ fileIndex: PartitioningAwareFileIndex,
+ readDataSchema: StructType,
+ readPartitionSchema: StructType,
+ options: CaseInsensitiveStringMap)
+ extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema,
readPartitionSchema, options) {
+
+ private val optionsAsScala = options.asScala.toMap
+ private lazy val textOptions: TextOptions = new TextOptions(optionsAsScala)
+
+ override def isSplitable(path: Path): Boolean = {
+ super.isSplitable(path) && !textOptions.wholeText
+ }
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ assert(
+ readDataSchema.length <= 1,
+ "Text data source only produces a single data column named \"value\".")
+ val hadoopConf = {
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ // Hadoop Configurations are case sensitive.
+ sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+ }
+ val broadcastedConf = sparkSession.sparkContext.broadcast(
+ new SerializableConfiguration(hadoopConf))
+ TextPartitionReaderFactory(sparkSession.sessionState.conf,
broadcastedConf, readDataSchema,
+ readPartitionSchema, textOptions)
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScanBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScanBuilder.scala
new file mode 100644
index 0000000..fbe5e16
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScanBuilder.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2.text
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
+import org.apache.spark.sql.sources.v2.reader.Scan
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+case class TextScanBuilder(
+ sparkSession: SparkSession,
+ fileIndex: PartitioningAwareFileIndex,
+ schema: StructType,
+ dataSchema: StructType,
+ options: CaseInsensitiveStringMap)
+ extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+
+ override def build(): Scan = {
+ TextScan(sparkSession, fileIndex, readDataSchema(), readPartitionSchema(),
options)
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
new file mode 100644
index 0000000..b8cb61a
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.text
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.v2.FileTable
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder
+import org.apache.spark.sql.types.{DataType, StringType, StructField,
StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+case class TextTable(
+ name: String,
+ sparkSession: SparkSession,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ fallbackFileFormat: Class[_ <: FileFormat])
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+ override def newScanBuilder(options: CaseInsensitiveStringMap):
TextScanBuilder =
+ TextScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+
+ override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
+ Some(StructType(Seq(StructField("value", StringType))))
+
+ override def newWriteBuilder(options: CaseInsensitiveStringMap):
WriteBuilder =
+ new TextWriteBuilder(options, paths, formatName, supportsDataType)
+
+ override def supportsDataType(dataType: DataType): Boolean = dataType ==
StringType
+
+ override def formatName: String = "Text"
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala
new file mode 100644
index 0000000..c00dbc2
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.text
+
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter,
OutputWriterFactory}
+import org.apache.spark.sql.execution.datasources.text.{TextOptions,
TextOutputWriter}
+import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class TextWriteBuilder(
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ formatName: String,
+ supportsDataType: DataType => Boolean)
+ extends FileWriteBuilder(options, paths, formatName, supportsDataType) {
+ private def verifySchema(schema: StructType): Unit = {
+ if (schema.size != 1) {
+ throw new AnalysisException(
+ s"Text data source supports only a single column, and you have
${schema.size} columns.")
+ }
+ }
+
+ override def prepareWrite(
+ sqlConf: SQLConf,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ verifySchema(dataSchema)
+
+ val textOptions = new TextOptions(options)
+ val conf = job.getConfiguration
+
+ textOptions.compressionCodec.foreach { codec =>
+ CompressionCodecs.setCodecConfiguration(conf, codec)
+ }
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new TextOutputWriter(path, dataSchema,
textOptions.lineSeparatorInWrite, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ ".txt" + CodecStreams.getCompressionExtension(context)
+ }
+ }
+ }
+}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 3402ed2..c9ff4fc 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -930,7 +930,7 @@ class MetastoreDataSourcesSuite extends QueryTest with
SQLTestUtils with TestHiv
}
assert(e.getMessage.contains(
"The format of the existing table default.appendTextToJson is
`JsonFileFormat`. " +
- "It doesn't match the specified format `TextFileFormat`"))
+ "It doesn't match the specified format"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]