[ 
https://issues.apache.org/jira/browse/FLINK-38797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wren updated FLINK-38797:
-------------------------
    Description: 
The 
[CsvSchemaBuilder.set_null_value|https://github.com/apache/flink/blob/d7206187f52be9b03569aa68a6c5647f180dc25f/flink-python/pyflink/datastream/formats/csv.py#L235-L239]
 is missing an additional line, {{return self}} that allows it to be used 
properly.
{code:python}
    def set_null_value(self, null_value: str):
        """
        Set literal for null value, default to empty sequence.
        """
        self._j_schema_builder.setNullValue(null_value)
{code}

The workaround while this bug is still in place would be as follows:
{code:python}
from pyflink.datastream.formats.csv import CsvSchema, CsvSchemaBuilder

null_value: str = "" # default null value
schema_builder: CsvSchemaBuilder = CsvSchema.builder() \
    .add_string_column("col1") \
    .add_string_column("col2") \
    .add_string_column("col3")
schema_builder._j_schema_builder.setNullValue(null_value) # additional line 
needed because of bug
schema: CsvSchema = schema_builder.build()
{code}

If the bug is resolved, then the above would be as follows:
{code:python}
from pyflink.datastream.formats.csv import CsvSchema, CsvSchemaBuilder

null_value: str = '' # default null value
schema: CsvSchema =  CsvSchema.builder() \
    .add_string_column("col1") \
    .add_string_column("col2") \
    .add_string_column("col3") \
    .set_null_value(null_value) \
    .build()
{code}

Possible unit tests to be added based on other tests in {{test_csv.py}}:
{code:python}
class FileSourceCsvReaderFormatTests(object):

    def test_csv_default_null_value(self):
        schema, lines = _create_csv_default_null_value_schema_and_lines()
        self._build_csv_job(schema, lines)
        self.env.execute('test_csv_default_null_value')
        _check_csv_default_null_value_schema_and_lines(self, 
self.test_sink.get_results(True, False))

def _create_csv_default_null_value_schema_and_lines() -> Tuple[CsvSchema, 
List[str]]:
    schema = CsvSchema.builder() \
        .add_string_column('string') \
        .add_number_column('number') \
        .set_null_value('') \
        .build()
    lines = [
        ',123\n'
    ]
    return schema, lines


def _check_csv_default_null_value_schema_and_lines(test, results):
    row = results[0]
    test.assertEqual(row['string'], None)
    test.assertEqual(row['number'], 123)
{code}

  was:
The 
[CsvSchemaBuilder.set_null_value|https://github.com/apache/flink/blob/d7206187f52be9b03569aa68a6c5647f180dc25f/flink-python/pyflink/datastream/formats/csv.py#L235-L239]
 is missing an additional line, {{return self}} that allows it to be used 
properly.
{code:python}
    def set_null_value(self, null_value: str):
        """
        Set literal for null value, default to empty sequence.
        """
        self._j_schema_builder.setNullValue(null_value)
{code}
The workaround while this bug is still in place would be as follows:
{code:python}
from pyflink.datastream.formats.csv import CsvSchema, CsvSchemaBuilder

null_value: str = "" # default null value
schema_builder: CsvSchemaBuilder = CsvSchema.builder() \
    .add_string_column("col1") \
    .add_string_column("col2") \
    .add_string_column("col3")
schema_builder._j_schema_builder.setNullValue(null_value) # additional line 
needed because of bug
schema: CsvSchema = schema_builder.build()
{code}
If the bug is resolved, then the above would be as follows:
{code:python}
from pyflink.datastream.formats.csv import CsvSchema, CsvSchemaBuilder

null_value: str = '' # default null value
schema: CsvSchema =  CsvSchema.builder() \
    .add_string_column("col1") \
    .add_string_column("col2") \
    .add_string_column("col3") \
    .set_null_value(null_value) \
    .build()
{code}


> PyFlink CsvSchemaBuilder.set_null_value method missing return self
> ------------------------------------------------------------------
>
>                 Key: FLINK-38797
>                 URL: https://issues.apache.org/jira/browse/FLINK-38797
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>            Reporter: Wren
>            Priority: Major
>
> The 
> [CsvSchemaBuilder.set_null_value|https://github.com/apache/flink/blob/d7206187f52be9b03569aa68a6c5647f180dc25f/flink-python/pyflink/datastream/formats/csv.py#L235-L239]
>  is missing an additional line, {{return self}} that allows it to be used 
> properly.
> {code:python}
>     def set_null_value(self, null_value: str):
>         """
>         Set literal for null value, default to empty sequence.
>         """
>         self._j_schema_builder.setNullValue(null_value)
> {code}
> The workaround while this bug is still in place would be as follows:
> {code:python}
> from pyflink.datastream.formats.csv import CsvSchema, CsvSchemaBuilder
> null_value: str = "" # default null value
> schema_builder: CsvSchemaBuilder = CsvSchema.builder() \
>     .add_string_column("col1") \
>     .add_string_column("col2") \
>     .add_string_column("col3")
> schema_builder._j_schema_builder.setNullValue(null_value) # additional line 
> needed because of bug
> schema: CsvSchema = schema_builder.build()
> {code}
> If the bug is resolved, then the above would be as follows:
> {code:python}
> from pyflink.datastream.formats.csv import CsvSchema, CsvSchemaBuilder
> null_value: str = '' # default null value
> schema: CsvSchema =  CsvSchema.builder() \
>     .add_string_column("col1") \
>     .add_string_column("col2") \
>     .add_string_column("col3") \
>     .set_null_value(null_value) \
>     .build()
> {code}
> Possible unit tests to be added based on other tests in {{test_csv.py}}:
> {code:python}
> class FileSourceCsvReaderFormatTests(object):
>     def test_csv_default_null_value(self):
>         schema, lines = _create_csv_default_null_value_schema_and_lines()
>         self._build_csv_job(schema, lines)
>         self.env.execute('test_csv_default_null_value')
>         _check_csv_default_null_value_schema_and_lines(self, 
> self.test_sink.get_results(True, False))
> def _create_csv_default_null_value_schema_and_lines() -> Tuple[CsvSchema, 
> List[str]]:
>     schema = CsvSchema.builder() \
>         .add_string_column('string') \
>         .add_number_column('number') \
>         .set_null_value('') \
>         .build()
>     lines = [
>         ',123\n'
>     ]
>     return schema, lines
> def _check_csv_default_null_value_schema_and_lines(test, results):
>     row = results[0]
>     test.assertEqual(row['string'], None)
>     test.assertEqual(row['number'], 123)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to