Repository: spark Updated Branches: refs/heads/master 4f83ca105 -> 93338807a
[SPARK-13792][SQL] Addendum: Fix Python API ## What changes were proposed in this pull request? This is a follow-up to https://github.com/apache/spark/pull/13795 to properly set CSV options in Python API. As part of this, I also make the Python option setting for both CSV and JSON more robust against positional errors. ## How was this patch tested? N/A Author: Reynold Xin <r...@databricks.com> Closes #13800 from rxin/SPARK-13792-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93338807 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93338807 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93338807 Branch: refs/heads/master Commit: 93338807aafdb2db9fb036ceadee1467cd367cdd Parents: 4f83ca1 Author: Reynold Xin <r...@databricks.com> Authored: Tue Jun 21 10:47:51 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Tue Jun 21 10:47:51 2016 -0700 ---------------------------------------------------------------------- python/pyspark/sql/readwriter.py | 54 +++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/93338807/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 89506ca..ccbf895 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -77,7 +77,7 @@ class ReaderUtils(object): def _set_csv_opts(self, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, - dateFormat, maxColumns, maxCharsPerColumn, mode): + dateFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode): """ Set options based on the CSV optional parameters """ @@ -115,6 +115,8 @@ class ReaderUtils(object): self.option("maxColumns", maxColumns) if maxCharsPerColumn is not None: self.option("maxCharsPerColumn", maxCharsPerColumn) + if maxMalformedLogPerPartition is not None: + self.option("maxMalformedLogPerPartition", maxMalformedLogPerPartition) if mode is not None: self.option("mode", mode) @@ -268,10 +270,12 @@ class DataFrameReader(ReaderUtils): [('age', 'bigint'), ('name', 'string')] """ - self._set_json_opts(schema, primitivesAsString, prefersDecimal, - allowComments, allowUnquotedFieldNames, allowSingleQuotes, - allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, - mode, columnNameOfCorruptRecord) + self._set_json_opts( + schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal, + allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames, + allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, + allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, + mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -343,7 +347,8 @@ class DataFrameReader(ReaderUtils): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, mode=None): + negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, + maxMalformedLogPerPartition=None, mode=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -408,11 +413,13 @@ class DataFrameReader(ReaderUtils): >>> df.dtypes [('_c0', 'string'), ('_c1', 'string')] """ - - self._set_csv_opts(schema, sep, encoding, quote, escape, - comment, header, inferSchema, ignoreLeadingWhiteSpace, - ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, - dateFormat, maxColumns, maxCharsPerColumn, mode) + self._set_csv_opts( + schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment, + header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, + ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, + nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, + dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) if isinstance(path, basestring): path = [path] return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) @@ -958,10 +965,12 @@ class DataStreamReader(ReaderUtils): >>> json_sdf.schema == sdf_schema True """ - self._set_json_opts(schema, primitivesAsString, prefersDecimal, - allowComments, allowUnquotedFieldNames, allowSingleQuotes, - allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, - mode, columnNameOfCorruptRecord) + self._set_json_opts( + schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal, + allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames, + allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, + allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, + mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord) if isinstance(path, basestring): return self._df(self._jreader.json(path)) else: @@ -1019,7 +1028,8 @@ class DataStreamReader(ReaderUtils): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, mode=None): + negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, + maxMalformedLogPerPartition=None, mode=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -1085,11 +1095,13 @@ class DataStreamReader(ReaderUtils): >>> csv_sdf.schema == sdf_schema True """ - - self._set_csv_opts(schema, sep, encoding, quote, escape, - comment, header, inferSchema, ignoreLeadingWhiteSpace, - ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, - dateFormat, maxColumns, maxCharsPerColumn, mode) + self._set_csv_opts( + schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment, + header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, + ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, + nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, + dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) else: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org