dianfu commented on code in PR #20391:
URL: https://github.com/apache/flink/pull/20391#discussion_r934209099
##########
flink-python/pyflink/datastream/connectors/file_system.py:
##########
@@ -599,8 +620,26 @@ def with_output_file_config(self, output_file_config:
OutputFileConfig) \
output_file_config._j_output_file_config)
return self
+ def _with_row_data_converter(self, row_type: 'RowType') ->
'FileSink.BulkFormatBuilder':
Review Comment:
What about renaming it to something else as the given parameter isn't a
converter?
##########
flink-python/pyflink/datastream/formats/csv.py:
##########
@@ -293,9 +304,62 @@ def for_schema(schema: 'CsvSchema') -> 'CsvReaderFormat':
Builds a :class:`CsvReaderFormat` using `CsvSchema`.
"""
jvm = get_gateway().jvm
- j_csv_format = jvm.org.apache.flink.formats.csv.CsvReaderFormatFactory
\
+ j_csv_format = jvm.org.apache.flink.formats.csv.PythonCsvUtils \
.createCsvReaderFormat(
schema._j_schema,
- _to_java_data_type(schema._data_type)
+ _to_java_data_type(schema._row_type)
)
return CsvReaderFormat(j_csv_format)
+
+
+class CsvBulkWriter(object):
+ """
+ CsvBulkWriter is for building :class:`BulkWriterFactory` to write Rows
with a predefined CSV
+ schema to partitioned files in a bulk fashion.
+
+ Example:
+ ::
+
+ >>> schema = CsvSchema.builder() \\
+ ... .add_number_column('id', number_type=DataTypes.INT()) \\
+ ... .add_string_column('name') \\
+ ... .add_array_column('list', ',',
element_type=DataTypes.STRING()) \\
+ ... .set_column_separator('|') \\
+ ... .build()
+ >>> sink = FileSink.for_bulk_format(
+ ... OUTPUT_DIR, CsvBulkWriter.for_schema(schema)).build()
+ >>> # If ds is a source stream, an identity map before sink is required
+ >>> ds.map(lambda e: e,
output_type=schema.get_type_info()).sink_to(sink)
+
+ .. versionadded:: 1.16.0
+ """
+
+ class Factory(RowDataBulkWriterFactory):
Review Comment:
Use RowDataBulkWriterFactory directly?
##########
flink-python/pyflink/datastream/connectors/file_system.py:
##########
@@ -161,6 +167,13 @@ def __init__(self, j_bulk_writer_factory):
super().__init__(j_bulk_writer_factory)
+class RowDataBulkWriterFactory(BulkWriterFactory):
+
Review Comment:
Add some Python doc for this class?
##########
flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java:
##########
@@ -94,4 +100,30 @@ public byte[] serialize(Row row) {
return wrappedSchema.serialize((T) row.getField(1));
}
}
+
+ /** A {@link ProcessFunction} that convert {@link Row} to {@link RowData}.
*/
+ public static class RowToRowDataProcessFunction extends
ProcessFunction<Row, RowData> {
+
+ private static final long serialVersionUID = 1L;
+ private final DataType dataType;
+ private transient RowRowConverter converter;
+
+ public RowToRowDataProcessFunction(DataType dataType) {
+ this.dataType = dataType;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ converter = RowRowConverter.create(dataType);
+ converter.open(RowToRowDataProcessFunction.class.getClassLoader());
Review Comment:
```suggestion
converter.open(getRuntimeContext().getUserCodeClassLoader());
```
##########
flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java:
##########
@@ -94,4 +100,30 @@ public byte[] serialize(Row row) {
return wrappedSchema.serialize((T) row.getField(1));
}
}
+
+ /** A {@link ProcessFunction} that convert {@link Row} to {@link RowData}.
*/
+ public static class RowToRowDataProcessFunction extends
ProcessFunction<Row, RowData> {
Review Comment:
```suggestion
public static class RowRowMapper extends RichMapFunction<Row, RowData> {
```
##########
flink-python/pyflink/datastream/connectors/file_system.py:
##########
@@ -526,8 +539,15 @@ class FileSink(Sink):
the checkpoint from which we restore.
"""
- def __init__(self, j_file_sink):
+ def __init__(self, j_file_sink, preprocessing=None):
super(FileSink, self).__init__(sink=j_file_sink)
+ self._preprocessing = preprocessing
+
+ def need_preprocessing(self):
+ return self._preprocessing is not None
+
+ def get_preprocessing(self):
Review Comment:
What about refactoring SupportPreprocessing as following:
```
def apply(ds):
pass
```
Then we could simply the implementation and also avoid introducing
TransformAppender.
##########
docs/content/docs/connectors/datastream/formats/csv.md:
##########
@@ -137,3 +137,20 @@ The corresponding CSV file:
```
Similarly to the `TextLineInputFormat`, `CsvReaderFormat` can be used in both
continues and batch modes (see [TextLineInputFormat]({{< ref
"docs/connectors/datastream/formats/text_files" >}}) for examples).
+
+For PyFlink users, `CsvBulkWriter` is for creating `BulkWriterFactory` to
write `Row` records to files in CSV format. Notice that if predecessor of sink
is a source stream producing `RowData` records, e.g. CSV source, an identity
map is required to make this work.
Review Comment:
It's not quite clear for me why we need the identity map
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]