BryceCicada commented on a change in pull request #12824:
URL: https://github.com/apache/flink/pull/12824#discussion_r450066743



##########
File path: flink-python/pyflink/table/sources.py
##########
@@ -38,16 +38,62 @@ class CsvTableSource(TableSource):
     (logically) unlimited number of fields.
 
     :param source_path: The path to the CSV file.
+    :type source_path: str
     :param field_names: The names of the table fields.
+    :type field_names: collections.Iterable[str]
     :param field_types: The types of the table fields.
+    :type field_types: collections.Iterable[str]
+    :param field_delim: The field delimiter, "," by default.
+    :type field_delim: str, optional
+    :param line_delim: The row delimiter, "\\n" by default.
+    :type line_delim: str, optional
+    :param quote_character: An optional quote character for String values, 
null by default.
+    :type quote_character: str, optional
+    :param ignore_first_line: Flag to ignore the first line, false by default.
+    :type ignore_first_line: bool, optional
+    :param ignore_comments: An optional prefix to indicate comments, null by 
default.
+    :type ignore_comments: str, optional
+    :param lenient: Flag to skip records with parse error instead to fail, 
false by default.
+    :type lenient: bool, optional
     """
 
-    def __init__(self, source_path, field_names, field_types):
+    def __init__(
+        self,
+        source_path,
+        field_names,
+        field_types,
+        field_delim=None,
+        line_delim=None,
+        quote_character=None,
+        ignore_first_line=None,
+        ignore_comments=None,
+        lenient=None,
+    ):
         # type: (str, list[str], list[DataType]) -> None
         gateway = get_gateway()
-        j_field_names = utils.to_jarray(gateway.jvm.String, field_names)
-        j_field_types = utils.to_jarray(gateway.jvm.TypeInformation,
-                                        [_to_java_type(field_type)
-                                         for field_type in field_types])
-        super(CsvTableSource, self).__init__(
-            gateway.jvm.CsvTableSource(source_path, j_field_names, 
j_field_types))
+
+        builder = gateway.jvm.CsvTableSource.builder()
+        builder.path(source_path)
+
+        for (field_name, field_type) in zip(field_names, field_types):
+            builder.field(field_name, _to_java_type(field_type))
+
+        if field_delim is not None:
+            builder.fieldDelimiter(field_delim)
+
+        if line_delim is not None:
+            builder.lineDelimiter(line_delim)
+
+        if quote_character is not None:
+            builder.quoteCharacter(quote_character)

Review comment:
       Interestingly, it works ok.   Py4J appears to be selecting the first 
character of the Python string as the Java Character.  
   
   But yeah, there is some surprise in this so it would make sense for me to 
add some validation of `len(quote_character) == 1`.
   
   UPDATE:  Here's the Py4J code.  
https://github.com/bartdag/py4j/blob/4152353ac142a7c6d177e0d8f5d420d92c846a30/py4j-java/src/main/java/py4j/reflection/TypeConverter.java#L85




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to