HuangXingBo commented on code in PR #20220:
URL: https://github.com/apache/flink/pull/20220#discussion_r921041412
##########
docs/content/docs/connectors/datastream/formats/csv.md:
##########
@@ -113,6 +115,22 @@ CsvReaderFormat<ComplexPojo> csvFormat =
.build(),
TypeInformation.of(ComplexPojo.class));
```
+
+For PyFlink users, a csv schema can be defined by manually adding columns, and
the output type of the csv source will be a Row with each column mapped to a
field.
+```python
+schema = CsvSchema.builder() \
+ .add_number_column('id', number_type=DataTypes.BIGINT()) \
Review Comment:
Why not use `TypeInformation`, but `DataType`?
##########
flink-python/pyflink/datastream/connectors/tests/test_file_system.py:
##########
@@ -34,6 +35,162 @@
from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
+class FileSourceCsvReaderFormatTests(PyFlinkStreamingTestCase):
+
+ def setUp(self):
+ super().setUp()
+ self.test_sink = DataStreamTestSinkFunction()
+
+ def test_csv_primitive_column(self):
+ csv_file_name = tempfile.mktemp(suffix='.csv', dir=self.tempdir)
Review Comment:
move this logic to the setUp?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]