ahmedabu98 commented on code in PR #30770:
URL: https://github.com/apache/beam/pull/30770#discussion_r1543554920
##########
sdks/python/apache_beam/io/avroio.py:
##########
@@ -649,7 +684,11 @@ def beam_schema_to_avro_schema(
def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType:
type_info = beam_type.WhichOneof("type_info")
if type_info == "atomic_type":
- return {'type': BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type]}
+ avro_primitive = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type]
+ if beam_type.nullable:
+ return ['null', avro_primitive]
Review Comment:
Wondering if this should be:
```python
return {'type': ['null', avro_primitive]}
```
##########
sdks/python/apache_beam/io/avroio_test.py:
##########
@@ -149,17 +151,37 @@ def _run_avro_test(
def test_schema_read_write(self):
with tempfile.TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname, 'tmp_filename')
- rows = [beam.Row(a=1, b=['x', 'y']), beam.Row(a=2, b=['t', 'u'])]
+ rows = [beam.Row(a=-1, b=['x', 'y']), beam.Row(a=2, b=['t', 'u'])]
stable_repr = lambda row: json.dumps(row._asdict())
with TestPipeline() as p:
_ = p | Create(rows) | avroio.WriteToAvro(path) | beam.Map(print)
with TestPipeline() as p:
readback = (
p
| avroio.ReadFromAvro(path + '*', as_rows=True)
+ | SqlTransform("SELECT * FROM PCOLLECTION")
Review Comment:
If we're using SqlTransform in a test, we should mark that test with
`@pytest.mark.xlang_sql_expansion_service` (because it's an external transform
that depends on the expansion service jar to be built).
I'd prefer if we keep this original test and make a duplicate test with the
SqlTransform step.
##########
sdks/python/apache_beam/io/avroio_test.py:
##########
@@ -149,17 +151,37 @@ def _run_avro_test(
def test_schema_read_write(self):
with tempfile.TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname, 'tmp_filename')
- rows = [beam.Row(a=1, b=['x', 'y']), beam.Row(a=2, b=['t', 'u'])]
+ rows = [beam.Row(a=-1, b=['x', 'y']), beam.Row(a=2, b=['t', 'u'])]
stable_repr = lambda row: json.dumps(row._asdict())
with TestPipeline() as p:
_ = p | Create(rows) | avroio.WriteToAvro(path) | beam.Map(print)
with TestPipeline() as p:
readback = (
p
| avroio.ReadFromAvro(path + '*', as_rows=True)
+ | SqlTransform("SELECT * FROM PCOLLECTION")
| beam.Map(stable_repr))
assert_that(readback, equal_to([stable_repr(r) for r in rows]))
+ def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self):
Review Comment:
similarly also add `@pytest.mark.xlang_sql_expansion_service` here
##########
sdks/python/apache_beam/io/avroio_test.py:
##########
@@ -149,17 +151,37 @@ def _run_avro_test(
def test_schema_read_write(self):
with tempfile.TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname, 'tmp_filename')
- rows = [beam.Row(a=1, b=['x', 'y']), beam.Row(a=2, b=['t', 'u'])]
+ rows = [beam.Row(a=-1, b=['x', 'y']), beam.Row(a=2, b=['t', 'u'])]
stable_repr = lambda row: json.dumps(row._asdict())
with TestPipeline() as p:
_ = p | Create(rows) | avroio.WriteToAvro(path) | beam.Map(print)
with TestPipeline() as p:
readback = (
p
| avroio.ReadFromAvro(path + '*', as_rows=True)
+ | SqlTransform("SELECT * FROM PCOLLECTION")
| beam.Map(stable_repr))
assert_that(readback, equal_to([stable_repr(r) for r in rows]))
+ def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self):
+ with tempfile.TemporaryDirectory() as tmp_dirname_input:
+ input_path = os.path.join(tmp_dirname_input, 'tmp_filename.avro')
+ parsed_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING))
+ with open(input_path, 'wb') as tmp_avro_file:
+ fastavro.writer(tmp_avro_file, parsed_schema, self.RECORDS)
+
+ with tempfile.TemporaryDirectory() as tmp_dirname_output:
+
+ with TestPipeline() as p:
+ _ = (
+ p
+ | avroio.ReadFromAvro(input_path, as_rows=True)
+ | SqlTransform("SELECT * FROM PCOLLECTION")
+ | avroio.WriteToAvro(tmp_dirname_output))
+ with TestPipeline() as p:
+ readback = (p | avroio.ReadFromAvro(tmp_dirname_output + "*"))
+ assert_that(readback, equal_to(RECORDS))
Review Comment:
Heads up I think RECORDS doesn't actually have any records with null values.
May be good to include them if we're testing nullability conversion here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]