This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch 
revert-30770-python-sdk-avro-beam-schema-conversion-bugfixes
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0dfdcc2ca10ac9caa9facc002ea181c52424ba9c
Author: Yi Hu <huu...@gmail.com>
AuthorDate: Thu Apr 25 14:04:07 2024 -0400

    Revert "python sdk: fix several bugs regarding avto <-> beam schema 
conversio…"
    
    This reverts commit 45e78572e8f7399c3e455eeb53fe762ddf8e833e.
---
 sdks/python/apache_beam/io/avroio.py      | 132 ++++--------------------------
 sdks/python/apache_beam/io/avroio_test.py |  84 +------------------
 2 files changed, 18 insertions(+), 198 deletions(-)

diff --git a/sdks/python/apache_beam/io/avroio.py 
b/sdks/python/apache_beam/io/avroio.py
index 8b7958a00b8..24df59ddc5c 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -43,7 +43,6 @@ that can be used to write a given ``PCollection`` of Python 
objects to an
 Avro file.
 """
 # pytype: skip-file
-import ctypes
 import os
 from functools import partial
 from typing import Any
@@ -545,35 +544,12 @@ BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES = {
 _AvroSchemaType = Union[str, List, Dict]
 
 
-def avro_union_type_to_beam_type(union_type: List) -> schema_pb2.FieldType:
-  """convert an avro union type to a beam type
-
-  if the union type is a nullable, and it is a nullable union of an avro
-  primitive with a corresponding beam primitive then create a nullable beam
-  field of the corresponding beam type, otherwise return an Any type.
-
-  Args:
-    union_type: the avro union type to convert.
-
-  Returns:
-    the beam type of the avro union.
-  """
-  if len(union_type) == 2 and "null" in union_type:
-    for avro_type in union_type:
-      if avro_type in AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES:
-        return schema_pb2.FieldType(
-            atomic_type=AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES[avro_type],
-            nullable=True)
-    return schemas.typing_to_runner_api(Any)
-  return schemas.typing_to_runner_api(Any)
-
-
 def avro_type_to_beam_type(avro_type: _AvroSchemaType) -> schema_pb2.FieldType:
   if isinstance(avro_type, str):
     return avro_type_to_beam_type({'type': avro_type})
   elif isinstance(avro_type, list):
     # Union type
-    return avro_union_type_to_beam_type(avro_type)
+    return schemas.typing_to_runner_api(Any)
   type_name = avro_type['type']
   if type_name in AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES:
     return schema_pb2.FieldType(
@@ -629,37 +605,11 @@ def avro_dict_to_beam_row(
           to_row)
 
 
-def avro_atomic_value_to_beam_atomic_value(avro_type: str, value):
-  """convert an avro atomic value to a beam atomic value
-
-  if the avro type is an int or long, convert the value into from signed to
-  unsigned because VarInt.java expects the number to be unsigned when
-  decoding the number.
-
-  Args:
-    avro_type: the avro type of the corresponding value.
-    value: the avro atomic value.
-
-  Returns:
-    the converted beam atomic value.
-  """
-  if value is None:
-    return value
-  elif avro_type == "int":
-    return ctypes.c_uint32(value).value
-  elif avro_type == "long":
-    return ctypes.c_uint64(value).value
-  else:
-    return value
-
-
 def avro_value_to_beam_value(
     beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]:
   type_info = beam_type.WhichOneof("type_info")
   if type_info == "atomic_type":
-    avro_type = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type]
-    return lambda value: avro_atomic_value_to_beam_atomic_value(
-        avro_type, value)
+    return lambda value: value
   elif type_info == "array_type":
     element_converter = avro_value_to_beam_value(
         beam_type.array_type.element_type)
@@ -671,7 +621,7 @@ def avro_value_to_beam_value(
   elif type_info == "map_type":
     if beam_type.map_type.key_type.atomic_type != schema_pb2.STRING:
       raise TypeError(
-          f'Only strings allowed as map keys when converting from AVRO, '
+          f'Only strings allowd as map keys when converting from AVRO, '
           f'found {beam_type}')
     value_converter = avro_value_to_beam_value(beam_type.map_type.value_type)
     return lambda value: {k: value_converter(v) for (k, v) in value.items()}
@@ -696,63 +646,39 @@ def beam_schema_to_avro_schema(
       schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
 
 
-def unnest_primitive_type(beam_type: schema_pb2.FieldType):
-  """unnests beam types that map to avro primitives or unions.
-      
-      if mapping to a avro primitive or a union, don't nest the field type
-      for complex types, like arrays, we need to nest the type.
-      Example: { 'type': 'string' } -> 'string'
-      { 'type': 'array', 'items': 'string' } 
-      -> { 'type': 'array', 'items': 'string' }
-
-      Args:
-        beam_type: the beam type to map to avro.
-
-      Returns:
-        the converted avro type with the primitive or union type unnested.
-      """
-  avro_type = beam_type_to_avro_type(beam_type)
-  return avro_type['type'] if beam_type.WhichOneof(
-      "type_info") == "atomic_type" else avro_type
-
-
 def beam_type_to_avro_type(beam_type: schema_pb2.FieldType) -> _AvroSchemaType:
   type_info = beam_type.WhichOneof("type_info")
   if type_info == "atomic_type":
-    avro_primitive = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type]
-    avro_type = [
-        avro_primitive, 'null'
-    ] if beam_type.nullable else avro_primitive
-    return {'type': avro_type}
+    return {'type': BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type]}
   elif type_info == "array_type":
     return {
         'type': 'array',
-        'items': unnest_primitive_type(beam_type.array_type.element_type)
+        'items': beam_type_to_avro_type(beam_type.array_type.element_type)
     }
   elif type_info == "iterable_type":
     return {
         'type': 'array',
-        'items': unnest_primitive_type(beam_type.iterable_type.element_type)
+        'items': beam_type_to_avro_type(beam_type.iterable_type.element_type)
     }
   elif type_info == "map_type":
     if beam_type.map_type.key_type.atomic_type != schema_pb2.STRING:
       raise TypeError(
-          f'Only strings allowed as map keys when converting to AVRO, '
+          f'Only strings allowd as map keys when converting to AVRO, '
           f'found {beam_type}')
     return {
         'type': 'map',
-        'values': unnest_primitive_type(beam_type.map_type.element_type)
+        'values': beam_type_to_avro_type(beam_type.map_type.element_type)
     }
   elif type_info == "row_type":
     return {
         'type': 'record',
         'name': beam_type.row_type.schema.id,
         'fields': [{
-            'name': field.name, 'type': unnest_primitive_type(field.type)
+            'name': field.name, 'type': beam_type_to_avro_type(field.type)
         } for field in beam_type.row_type.schema.fields],
     }
   else:
-    raise ValueError(f"Unconvertable type: {beam_type}")
+    raise ValueError(f"Unconvertale type: {beam_type}")
 
 
 def beam_row_to_avro_dict(
@@ -767,55 +693,29 @@ def beam_row_to_avro_dict(
     return lambda row: convert(row[0])
 
 
-def beam_atomic_value_to_avro_atomic_value(avro_type: str, value):
-  """convert a beam atomic value to an avro atomic value
-
-  since numeric values are converted to unsigned in
-  avro_atomic_value_to_beam_atomic_value we need to convert
-  back to a signed number.
-
-  Args:
-    avro_type: avro type of the corresponding value.
-    value: the beam atomic value.
-
-  Returns:
-    the converted avro atomic value.
-  """
-  if value is None:
-    return value
-  elif avro_type == "int":
-    return ctypes.c_int32(value).value
-  elif avro_type == "long":
-    return ctypes.c_int64(value).value
-  else:
-    return value
-
-
 def beam_value_to_avro_value(
     beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]:
   type_info = beam_type.WhichOneof("type_info")
   if type_info == "atomic_type":
-    avro_type = BEAM_PRIMITIVES_TO_AVRO_PRIMITIVES[beam_type.atomic_type]
-    return lambda value: beam_atomic_value_to_avro_atomic_value(
-        avro_type, value)
+    return lambda value: value
   elif type_info == "array_type":
-    element_converter = beam_value_to_avro_value(
+    element_converter = avro_value_to_beam_value(
         beam_type.array_type.element_type)
     return lambda value: [element_converter(e) for e in value]
   elif type_info == "iterable_type":
-    element_converter = beam_value_to_avro_value(
+    element_converter = avro_value_to_beam_value(
         beam_type.iterable_type.element_type)
     return lambda value: [element_converter(e) for e in value]
   elif type_info == "map_type":
     if beam_type.map_type.key_type.atomic_type != schema_pb2.STRING:
       raise TypeError(
-          f'Only strings allowed as map keys when converting from AVRO, '
+          f'Only strings allowd as map keys when converting from AVRO, '
           f'found {beam_type}')
-    value_converter = beam_value_to_avro_value(beam_type.map_type.value_type)
+    value_converter = avro_value_to_beam_value(beam_type.map_type.value_type)
     return lambda value: {k: value_converter(v) for (k, v) in value.items()}
   elif type_info == "row_type":
     converters = {
-        field.name: beam_value_to_avro_value(field.type)
+        field.name: avro_value_to_beam_value(field.type)
         for field in beam_type.row_type.schema.fields
     }
     return lambda value: {
diff --git a/sdks/python/apache_beam/io/avroio_test.py 
b/sdks/python/apache_beam/io/avroio_test.py
index c95fbb61259..c54ac40711b 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -20,41 +20,31 @@ import json
 import logging
 import math
 import os
-import pytest
 import tempfile
 import unittest
-from typing import List, Any
+from typing import List
 
-import fastavro
 import hamcrest as hc
 
 from fastavro.schema import parse_schema
 from fastavro import writer
 
 import apache_beam as beam
-from apache_beam import Create, schema_pb2
+from apache_beam import Create
 from apache_beam.io import avroio
 from apache_beam.io import filebasedsource
 from apache_beam.io import iobase
 from apache_beam.io import source_test_utils
 from apache_beam.io.avroio import _FastAvroSource  # For testing
-from apache_beam.io.avroio import avro_schema_to_beam_schema  # For testing
-from apache_beam.io.avroio import beam_schema_to_avro_schema  # For testing
-from apache_beam.io.avroio import avro_atomic_value_to_beam_atomic_value  # 
For testing
-from apache_beam.io.avroio import avro_union_type_to_beam_type  # For testing
-from apache_beam.io.avroio import beam_atomic_value_to_avro_atomic_value  # 
For testing
 from apache_beam.io.avroio import _create_avro_sink  # For testing
 from apache_beam.io.filesystems import FileSystems
-from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.transforms.sql import SqlTransform
 from apache_beam.transforms.userstate import CombiningValueStateSpec
 from apache_beam.utils.timestamp import Timestamp
-from apache_beam.typehints import schemas
 
 # Import snappy optionally; some tests will be skipped when import fails.
 try:
@@ -170,76 +160,6 @@ class AvroBase(object):
             | beam.Map(stable_repr))
         assert_that(readback, equal_to([stable_repr(r) for r in rows]))
 
-  @pytest.mark.xlang_sql_expansion_service
-  @unittest.skipIf(
-      TestPipeline().get_pipeline_options().view_as(StandardOptions).runner is
-      None,
-      "Must be run with a runner that supports staging java artifacts.")
-  def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self):
-    records = []
-    records.extend(self.RECORDS)
-    records.append({
-        'name': 'Bruce', 'favorite_number': None, 'favorite_color': None
-    })
-    with tempfile.TemporaryDirectory() as tmp_dirname_input:
-      input_path = os.path.join(tmp_dirname_input, 'tmp_filename.avro')
-      parsed_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING))
-      with open(input_path, 'wb') as tmp_avro_file:
-        fastavro.writer(tmp_avro_file, parsed_schema, records)
-
-      with tempfile.TemporaryDirectory() as tmp_dirname_output:
-
-        with TestPipeline() as p:
-          _ = (
-              p
-              | avroio.ReadFromAvro(input_path, as_rows=True)
-              | SqlTransform("SELECT * FROM PCOLLECTION")
-              | avroio.WriteToAvro(tmp_dirname_output))
-        with TestPipeline() as p:
-          readback = (p | avroio.ReadFromAvro(tmp_dirname_output + "*"))
-          assert_that(readback, equal_to(records))
-
-  def test_avro_atomic_value_to_beam_atomic_value(self):
-    input_outputs = [('int', 1, 1), ('int', -1, 0xffffffff),
-                     ('int', None, None), ('long', 1, 1),
-                     ('long', -1, 0xffffffffffffffff), ('long', None, None),
-                     ('string', 'foo', 'foo')]
-    for test_avro_type, test_value, expected_value in input_outputs:
-      actual_value = avro_atomic_value_to_beam_atomic_value(
-          test_avro_type, test_value)
-      hc.assert_that(actual_value, hc.equal_to(expected_value))
-
-  def test_beam_atomic_value_to_avro_atomic_value(self):
-    input_outputs = [('int', 1, 1), ('int', 0xffffffff, -1),
-                     ('int', None, None), ('long', 1, 1),
-                     ('long', 0xffffffffffffffff, -1), ('long', None, None),
-                     ('string', 'foo', 'foo')]
-    for test_avro_type, test_value, expected_value in input_outputs:
-      actual_value = beam_atomic_value_to_avro_atomic_value(
-          test_avro_type, test_value)
-      hc.assert_that(actual_value, hc.equal_to(expected_value))
-
-  def test_avro_union_type_to_beam_type_with_nullable_long(self):
-    union_type = ['null', 'long']
-    beam_type = avro_union_type_to_beam_type(union_type)
-    expected_beam_type = schema_pb2.FieldType(
-        atomic_type=schema_pb2.INT64, nullable=True)
-    hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
-
-  def test_avro_union_type_to_beam_type_with_string_long(self):
-    union_type = ['string', 'long']
-    beam_type = avro_union_type_to_beam_type(union_type)
-    expected_beam_type = schemas.typing_to_runner_api(Any)
-    hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
-
-  def test_avro_schema_to_beam_and_back(self):
-    avro_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING))
-    beam_schema = avro_schema_to_beam_schema(avro_schema)
-    converted_avro_schema = beam_schema_to_avro_schema(beam_schema)
-    expected_fields = json.loads(self.SCHEMA_STRING)["fields"]
-    hc.assert_that(
-        converted_avro_schema["fields"], hc.equal_to(expected_fields))
-
   def test_read_without_splitting(self):
     file_name = self._write_data()
     expected_result = self.RECORDS

Reply via email to