This is an automated email from the ASF dual-hosted git repository. bossenti pushed a commit to branch fix/#1245-temporary-fix in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit d3ad1e6e8d0de49878a8d73c860751fc7398bd8c Author: bossenti <[email protected]> AuthorDate: Fri Feb 10 21:27:47 2023 +0100 [#1245] provide a temporary workaround for inconsistency in eventGrounding --- .../streampipes/model/common.py | 3 +- .../streampipes/model/resource/data_stream.py | 29 +++++++++++ .../tests/client/test_endpoint.py | 2 +- streampipes-client-python/tests/model/__init__.py | 0 .../tests/model/resource/__init__.py | 0 .../tests/model/resource/test_data_stream.py | 56 ++++++++++++++++++++++ 6 files changed, 88 insertions(+), 2 deletions(-) diff --git a/streampipes-client-python/streampipes/model/common.py b/streampipes-client-python/streampipes/model/common.py index c72cba3d8..fb7bcd797 100644 --- a/streampipes-client-python/streampipes/model/common.py +++ b/streampipes-client-python/streampipes/model/common.py @@ -47,6 +47,7 @@ class BasicModel(BaseModel): """ alias_generator = _snake_to_camel_case + allow_population_by_field_name = True class BaseElement(BasicModel): @@ -127,7 +128,7 @@ class TransportProtocol(BasicModel): element_id: Optional[StrictStr] broker_hostname: StrictStr topic_definition: TopicDefinition - port: StrictInt + port: StrictInt = Field(alias="kafkaPort") class TransportFormat(BasicModel): diff --git a/streampipes-client-python/streampipes/model/resource/data_stream.py b/streampipes-client-python/streampipes/model/resource/data_stream.py index 073160d02..8e3a5a67a 100644 --- a/streampipes-client-python/streampipes/model/resource/data_stream.py +++ b/streampipes-client-python/streampipes/model/resource/data_stream.py @@ -95,3 +95,32 @@ class DataStream(Resource): uri: Optional[StrictStr] dom: Optional[StrictStr] rev: Optional[StrictStr] = Field(alias="_rev") + + def to_dict(self, use_source_names=True): + """Returns the resource in dictionary representation. + + Parameters + ---------- + use_source_names: bool + Indicates if the dictionary keys are in python representation or + equally named to the StreamPipes backend + + Returns + ------ + resource: Dict[str, Any] + The resource as dictionary representation + + """ + + # This serves as a temporary fix for https://github.com/apache/streampipes/issues/1245 + # should be removed as soon as possible + + resource_dict = self.dict(by_alias=use_source_names) + + if use_source_names and (transport_protocol_dict := resource_dict["eventGrounding"]["transportProtocols"][0])[ + "@class"] != "org.apache.streampipes.model.grounding.KafkaTransportProtocol": + port = transport_protocol_dict.pop("kafkaPort") + transport_protocol_dict.update({"port": port}) + resource_dict["eventGrounding"]["transportProtocols"][0] = transport_protocol_dict + + return resource_dict diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py index c0c8d870c..16a2b6d9d 100644 --- a/streampipes-client-python/tests/client/test_endpoint.py +++ b/streampipes-client-python/tests/client/test_endpoint.py @@ -212,7 +212,7 @@ class TestStreamPipesEndpoints(TestCase): any_order=True, ) self.assertTrue(isinstance(result, DataStream)) - self.assertEqual(result.dict(by_alias=True), self.data_stream_get) + self.assertEqual(result.to_dict(use_source_names=True), self.data_stream_get) @patch("streampipes.client.client.Session", autospec=True) def test_endpoint_post(self, http_session: MagicMock): diff --git a/streampipes-client-python/tests/model/__init__.py b/streampipes-client-python/tests/model/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/streampipes-client-python/tests/model/resource/__init__.py b/streampipes-client-python/tests/model/resource/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/streampipes-client-python/tests/model/resource/test_data_stream.py b/streampipes-client-python/tests/model/resource/test_data_stream.py new file mode 100644 index 000000000..3688df468 --- /dev/null +++ b/streampipes-client-python/tests/model/resource/test_data_stream.py @@ -0,0 +1,56 @@ +from unittest import TestCase + +from streampipes.model.resource import DataStream + + +class TestDataStreamWorkaround(TestCase): + """ + Testcase that assures behavior of workaround introduced + as a temporary fix for https://github.com/apache/streampipes/issues/1245 + Needs to be removed as soon as possible + """ + + def test_nats_case(self): + data_stream_def = { + "elementId": "some-random-id", + "eventGrounding": { + "transportProtocols": [ + { + "@class": "org.apache.streampipes.model.grounding.NatsTransportProtocol", + "brokerHostname": "broker-host-name", + "topicDefinition": { + "@class": "some-class-name", + "actualTopicName": "actual-topic-name" + }, + "port": 50 + } + ] + } + } + + data_stream = DataStream.parse_obj(data_stream_def) + + self.assertEqual(50, data_stream.to_dict()["eventGrounding"]["transportProtocols"][0]["port"]) + + def test_kafka_case(self): + + data_stream_def = { + "elementId": "some-random-id", + "eventGrounding": { + "transportProtocols": [ + { + "@class": "org.apache.streampipes.model.grounding.KafkaTransportProtocol", + "brokerHostname": "broker-host-name", + "topicDefinition": { + "@class": "some-class-name", + "actualTopicName": "actual-topic-name" + }, + "kafkaPort": 50 + } + ] + } + } + + data_stream = DataStream.parse_obj(data_stream_def) + + self.assertEqual(50, data_stream.to_dict()["eventGrounding"]["transportProtocols"][0]["kafkaPort"])
