[
https://issues.apache.org/jira/browse/BEAM-7455?focusedWorklogId=257948&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-257948
]
ASF GitHub Bot logged work on BEAM-7455:
----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Jun/19 17:33
Start Date: 11/Jun/19 17:33
Worklog Time Spent: 10m
Work Description: tvalentyn commented on pull request #8818: [BEAM-7455]
Improve Avro IO integration test coverage on Python 3.
URL: https://github.com/apache/beam/pull/8818#discussion_r292576662
##########
File path: sdks/python/apache_beam/io/avroio_it_test.py
##########
@@ -132,73 +128,54 @@ def batch_indices(start):
return range(start * batch_size, (start + 1) * batch_size)
# A `PCollection` with `num_records` avro records
- records_pcoll = \
- self.test_pipeline \
- | 'create-batches' >> Create(batches) \
- | 'expand-batches' >> FlatMap(batch_indices) \
- | 'create-records' >> Map(record)
-
- fastavro_output = '/'.join([self.output, 'fastavro'])
- avro_output = '/'.join([self.output, 'avro'])
-
- # pylint: disable=expression-not-assigned
- records_pcoll \
- | 'write_fastavro' >> WriteToAvro(
- fastavro_output,
- parse_schema(json.loads(self.SCHEMA_STRING)),
- use_fastavro=True
- )
-
- # pylint: disable=expression-not-assigned
- records_pcoll \
- | 'write_avro' >> WriteToAvro(
- avro_output,
- Parse(self.SCHEMA_STRING),
- use_fastavro=False
- )
-
- result = self.test_pipeline.run()
- result.wait_until_finish()
- assert result.state == PipelineState.DONE
-
- fastavro_read_pipeline = TestPipeline(is_integration_test=True)
-
- fastavro_records = \
- fastavro_read_pipeline \
- | 'create-fastavro' >> Create(['%s*' % fastavro_output]) \
- | 'read-fastavro' >> ReadAllFromAvro(use_fastavro=True) \
- | Map(lambda rec: (rec['number'], rec))
-
- avro_records = \
- fastavro_read_pipeline \
- | 'create-avro' >> Create(['%s*' % avro_output]) \
- | 'read-avro' >> ReadAllFromAvro(use_fastavro=False) \
- | Map(lambda rec: (rec['number'], rec))
-
- def check(elem):
- v = elem[1]
-
- def assertEqual(l, r):
- if l != r:
- raise BeamAssertException('Assertion failed: %s == %s' % (l, r))
-
- assertEqual(v.keys(), ['avro', 'fastavro'])
- avro_values = v['avro']
- fastavro_values = v['fastavro']
- assertEqual(avro_values, fastavro_values)
- assertEqual(len(avro_values), 1)
-
- # pylint: disable=expression-not-assigned
- {
- 'avro': avro_records,
- 'fastavro': fastavro_records
- } \
- | CoGroupByKey() \
- | Map(check)
-
- self.addCleanup(delete_files, [self.output])
- fastavro_read_pipeline.run().wait_until_finish()
- assert result.state == PipelineState.DONE
+ _ = (write_pipeline
+ | 'CreateBatches' >> Create(batches)
+ | 'ExpandBatches' >> FlatMap(batch_indices)
+ | 'CreateRecords' >> Map(record)
+ | 'WriteAvro' >> WriteToAvro(files_output,
+ self.SCHEMA,
+ file_name_suffix=".avro",
+ use_fastavro=self.use_fastavro)
+ )
+
+ write_result = write_pipeline.run()
+ write_result.wait_until_finish()
+ assert write_result.state == PipelineState.DONE
+
+ def check(x, mean):
+ if not x == mean:
+ raise BeamAssertException('Assertion failed: %s == %s' % (x, mean))
+
+ _ = (read_pipeline
+ | "ReadAvro" >> ReadFromAvro(files_output + '*')
+ | 'ExtractNumber' >> Map(lambda x: x['number'])
+ | 'CalculateMean' >> Mean.Globally()
+ | 'MeanIsCorrect' >> Map(
Review comment:
This test evaluates correctness only of a value derived from 'number' field.
Can we instead compare that all attributes of the record are written and read
correctly? Also, comparing float numbers with `==` is not reliable due to a
lack of precision in the float numbers representation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 257948)
Time Spent: 50m (was: 40m)
> Improve Avro IO integration test coverage on Python 3.
> ------------------------------------------------------
>
> Key: BEAM-7455
> URL: https://issues.apache.org/jira/browse/BEAM-7455
> Project: Beam
> Issue Type: Sub-task
> Components: io-python-avro
> Reporter: Valentyn Tymofieiev
> Assignee: Frederik Bode
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> It seems that we don't have an integration test for Avro IO on Python 3:
> fastavro_it_test [1] depends on both avro and fastavro, however avro package
> currently does not work with Beam on Python 3, so we don't have an
> integration test that exercises Avro IO on Python 3.
> We should add an integration test for Avro IO that does not need both
> libraries at the same time, and instead can run using either library.
> [~frederik] is this something you could help with?
> cc: [~chamikara] [~Juta]
> [1]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/fastavro_it_test.py
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)