This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 433ae90 [SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling option configurable when read CSV 433ae90 is described below commit 433ae9064f55b8adb27b561e1ff17c32f0bf3465 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Fri Nov 27 15:47:39 2020 +0900 [SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling option configurable when read CSV ### What changes were proposed in this pull request? There are some differences between Spark CSV, opencsv and commons-csv, the typical case are described in SPARK-33566, When there are both unescaped quotes and unescaped qualifier in value, the results of parsing are different. The reason for the difference is Spark use `STOP_AT_DELIMITER` as default `UnescapedQuoteHandling` to build `CsvParser` and it not configurable. On the other hand, opencsv and commons-csv use the parsing mechanism similar to `STOP_AT_CLOSING_QUOTE ` by default. So this pr make `unescapedQuoteHandling` option configurable to get the same parsing result as opencsv and commons-csv. ### Why are the changes needed? Make unescapedQuoteHandling option configurable when read CSV to make parsing more flexible。 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass the Jenkins or GitHub Action - Add a new case similar to that described in SPARK-33566 Closes #30518 from LuciferYang/SPARK-33566. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- python/pyspark/sql/readwriter.py | 26 ++++++++++++++++++++-- python/pyspark/sql/readwriter.pyi | 1 + python/pyspark/sql/streaming.py | 25 +++++++++++++++++++-- python/pyspark/sql/streaming.pyi | 1 + .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 8 ++++++- .../org/apache/spark/sql/DataFrameReader.scala | 21 +++++++++++++++++ .../spark/sql/streaming/DataStreamReader.scala | 21 +++++++++++++++++ .../sql/execution/datasources/csv/CSVSuite.scala | 24 ++++++++++++++++++++ 8 files changed, 122 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bb31e6a..d120daa 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -522,7 +522,8 @@ class DataFrameReader(OptionUtils): maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None, - pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None): + pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, + unescapedQuoteHandling=None): r"""Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -685,6 +686,26 @@ class DataFrameReader(OptionUtils): modifiedAfter (batch only) : an optional timestamp to only include files with modification times occurring after the specified time. The provided timestamp must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + unescapedQuoteHandling : str, optional + defines how the CsvParser will handle values with unescaped quotes. If None is + set, it uses the default value, ``STOP_AT_DELIMITER``. + + * ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate + the quote character and proceed parsing the value as a quoted value, until a closing + quote is found. + * ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value + as an unquoted value. This will make the parser accumulate all characters of the current + parsed value until the delimiter is found. If no delimiter is found in the value, the + parser will continue accumulating characters from the input until a delimiter or line + ending is found. + * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value + as an unquoted value. This will make the parser accumulate all characters until the + delimiter or a line ending is found in the input. + * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed + for the given value will be skipped and the value set in nullValue will be produced + instead. + * ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException + will be thrown. Examples -------- @@ -708,7 +729,8 @@ class DataFrameReader(OptionUtils): charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio, enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, - modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter) + modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter, + unescapedQuoteHandling=unescapedQuoteHandling) if isinstance(path, str): path = [path] if type(path) == list: diff --git a/python/pyspark/sql/readwriter.pyi b/python/pyspark/sql/readwriter.pyi index 64c5697..c3b9a42 100644 --- a/python/pyspark/sql/readwriter.pyi +++ b/python/pyspark/sql/readwriter.pyi @@ -113,6 +113,7 @@ class DataFrameReader(OptionUtils): lineSep: Optional[str] = ..., pathGlobFilter: Optional[Union[bool, str]] = ..., recursiveFileLookup: Optional[Union[bool, str]] = ..., + unescapedQuoteHandling: Optional[str] = ..., ) -> DataFrame: ... def orc( self, diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index e7b2fa1..365b5f3 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -761,7 +761,7 @@ class DataStreamReader(OptionUtils): maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None, - pathGlobFilter=None, recursiveFileLookup=None): + pathGlobFilter=None, recursiveFileLookup=None, unescapedQuoteHandling=None): r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -900,6 +900,26 @@ class DataStreamReader(OptionUtils): recursiveFileLookup : str or bool, optional recursively scan a directory for files. Using this option disables `partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa + unescapedQuoteHandling : str, optional + defines how the CsvParser will handle values with unescaped quotes. If None is + set, it uses the default value, ``STOP_AT_DELIMITER``. + + * ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate + the quote character and proceed parsing the value as a quoted value, until a closing + quote is found. + * ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value + as an unquoted value. This will make the parser accumulate all characters of the current + parsed value until the delimiter is found. If no delimiter is found in the value, the + parser will continue accumulating characters from the input until a delimiter or line + ending is found. + * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value + as an unquoted value. This will make the parser accumulate all characters until the + delimiter or a line ending is found in the input. + * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed + for the given value will be skipped and the value set in nullValue will be produced + instead. + * ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException + will be thrown. .. versionadded:: 2.0.0 @@ -926,7 +946,8 @@ class DataStreamReader(OptionUtils): columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep, - pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup) + pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, + unescapedQuoteHandling=unescapedQuoteHandling) if isinstance(path, str): return self._df(self._jreader.csv(path)) else: diff --git a/python/pyspark/sql/streaming.pyi b/python/pyspark/sql/streaming.pyi index 56ce140..829610a 100644 --- a/python/pyspark/sql/streaming.pyi +++ b/python/pyspark/sql/streaming.pyi @@ -149,6 +149,7 @@ class DataStreamReader(OptionUtils): lineSep: Optional[str] = ..., pathGlobFilter: Optional[Union[bool, str]] = ..., recursiveFileLookup: Optional[Union[bool, str]] = ..., + unescapedQuoteHandling: Optional[str] = ..., ) -> DataFrame: ... class DataStreamWriter: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index f2191fc..ec40599 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -213,6 +213,12 @@ class CSVOptions( } val lineSeparatorInWrite: Option[String] = lineSeparator + /** + * The handling method to be used when unescaped quotes are found in the input. + */ + val unescapedQuoteHandling: UnescapedQuoteHandling = UnescapedQuoteHandling.valueOf(parameters + .getOrElse("unescapedQuoteHandling", "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT)) + def asWriterSettings: CsvWriterSettings = { val writerSettings = new CsvWriterSettings() val format = writerSettings.getFormat @@ -258,7 +264,7 @@ class CSVOptions( settings.setNullValue(nullValue) settings.setEmptyValue(emptyValueInRead) settings.setMaxCharsPerColumn(maxCharsPerColumn) - settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER) + settings.setUnescapedQuoteHandling(unescapedQuoteHandling) settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine) lineSeparatorInRead.foreach { _ => settings.setNormalizeLineEndingsWithinQuotes(!multiLine) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index b26bc64..8f96f0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -727,6 +727,27 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * a record can have.</li> * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length</li> + * <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser + * will handle values with unescaped quotes. + * <ul> + * <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate + * the quote character and proceed parsing the value as a quoted value, until a closing + * quote is found.</li> + * <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value + * as an unquoted value. This will make the parser accumulate all characters of the current + * parsed value until the delimiter is found. If no + * delimiter is found in the value, the parser will continue accumulating characters from + * the input until a delimiter or line ending is found.</li> + * <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value + * as an unquoted value. This will make the parser accumulate all characters until the + * delimiter or a line ending is found in the input.</li> + * <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed + * for the given value will be skipped and the value set in nullValue will be produced + * instead.</li> + * <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException + * will be thrown.</li> + * </ul> + * </li> * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. It supports the following case-insensitive modes. Note that Spark tries * to parse only required columns in CSV under column pruning. Therefore, corrupt records diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 9bc4acd..7f4ef8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -396,6 +396,27 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * a record can have.</li> * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length</li> + * <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser + * will handle values with unescaped quotes. + * <ul> + * <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate + * the quote character and proceed parsing the value as a quoted value, until a closing + * quote is found.</li> + * <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value + * as an unquoted value. This will make the parser accumulate all characters of the current + * parsed value until the delimiter is found. If no delimiter is found in the value, the + * parser will continue accumulating characters from the input until a delimiter or line + * ending is found.</li> + * <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value + * as an unquoted value. This will make the parser accumulate all characters until the + * delimiter or a line ending is found in the input.</li> + * <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed + * for the given value will be skipped and the value set in nullValue will be produced + * instead.</li> + * <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException + * will be thrown.</li> + * </ul> + * </li> * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. It supports the following case-insensitive modes. * <ul> diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index a236814..30f0e45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2428,6 +2428,30 @@ abstract class CSVSuite assert(readback.collect sameElements Array(Row("0"), Row("1"), Row("2"))) } } + + test("SPARK-33566: configure UnescapedQuoteHandling to parse " + + "unescaped quotes and unescaped delimiter data correctly") { + withTempPath { path => + val dataPath = path.getCanonicalPath + val row1 = Row("""a,""b,c""", "xyz") + val row2 = Row("""a,b,c""", """x""yz""") + // Generate the test data, use `,` as delimiter and `"` as quotes, but they didn't escape. + Seq( + """c1,c2""", + s""""${row1.getString(0)}","${row1.getString(1)}"""", + s""""${row2.getString(0)}","${row2.getString(1)}"""") + .toDF().repartition(1).write.text(dataPath) + // Without configure UnescapedQuoteHandling to STOP_AT_CLOSING_QUOTE, + // the result will be Row(""""a,""b""", """c""""), Row("""a,b,c""", """"x""yz"""") + val result = spark.read + .option("inferSchema", "true") + .option("header", "true") + .option("unescapedQuoteHandling", "STOP_AT_CLOSING_QUOTE") + .csv(dataPath).collect() + val exceptResults = Array(row1, row2) + assert(result.sameElements(exceptResults)) + } + } } class CSVv1Suite extends CSVSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org