This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch fix/#1245-temporary-fix
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit d3ad1e6e8d0de49878a8d73c860751fc7398bd8c
Author: bossenti <[email protected]>
AuthorDate: Fri Feb 10 21:27:47 2023 +0100

    [#1245] provide a temporary workaround for inconsistency in eventGrounding
---
 .../streampipes/model/common.py                    |  3 +-
 .../streampipes/model/resource/data_stream.py      | 29 +++++++++++
 .../tests/client/test_endpoint.py                  |  2 +-
 streampipes-client-python/tests/model/__init__.py  |  0
 .../tests/model/resource/__init__.py               |  0
 .../tests/model/resource/test_data_stream.py       | 56 ++++++++++++++++++++++
 6 files changed, 88 insertions(+), 2 deletions(-)

diff --git a/streampipes-client-python/streampipes/model/common.py 
b/streampipes-client-python/streampipes/model/common.py
index c72cba3d8..fb7bcd797 100644
--- a/streampipes-client-python/streampipes/model/common.py
+++ b/streampipes-client-python/streampipes/model/common.py
@@ -47,6 +47,7 @@ class BasicModel(BaseModel):
         """
 
         alias_generator = _snake_to_camel_case
+        allow_population_by_field_name = True
 
 
 class BaseElement(BasicModel):
@@ -127,7 +128,7 @@ class TransportProtocol(BasicModel):
     element_id: Optional[StrictStr]
     broker_hostname: StrictStr
     topic_definition: TopicDefinition
-    port: StrictInt
+    port: StrictInt = Field(alias="kafkaPort")
 
 
 class TransportFormat(BasicModel):
diff --git 
a/streampipes-client-python/streampipes/model/resource/data_stream.py 
b/streampipes-client-python/streampipes/model/resource/data_stream.py
index 073160d02..8e3a5a67a 100644
--- a/streampipes-client-python/streampipes/model/resource/data_stream.py
+++ b/streampipes-client-python/streampipes/model/resource/data_stream.py
@@ -95,3 +95,32 @@ class DataStream(Resource):
     uri: Optional[StrictStr]
     dom: Optional[StrictStr]
     rev: Optional[StrictStr] = Field(alias="_rev")
+
+    def to_dict(self, use_source_names=True):
+        """Returns the resource in dictionary representation.
+
+        Parameters
+        ----------
+        use_source_names: bool
+            Indicates if the dictionary keys are in python representation or
+            equally named to the StreamPipes backend
+
+        Returns
+        ------
+        resource: Dict[str, Any]
+            The resource as dictionary representation
+
+        """
+
+        # This serves as a temporary fix for 
https://github.com/apache/streampipes/issues/1245
+        # should be removed as soon as possible
+
+        resource_dict = self.dict(by_alias=use_source_names)
+
+        if use_source_names and (transport_protocol_dict := 
resource_dict["eventGrounding"]["transportProtocols"][0])[
+            "@class"] != 
"org.apache.streampipes.model.grounding.KafkaTransportProtocol":
+            port = transport_protocol_dict.pop("kafkaPort")
+            transport_protocol_dict.update({"port": port})
+            resource_dict["eventGrounding"]["transportProtocols"][0] = 
transport_protocol_dict
+
+        return resource_dict
diff --git a/streampipes-client-python/tests/client/test_endpoint.py 
b/streampipes-client-python/tests/client/test_endpoint.py
index c0c8d870c..16a2b6d9d 100644
--- a/streampipes-client-python/tests/client/test_endpoint.py
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -212,7 +212,7 @@ class TestStreamPipesEndpoints(TestCase):
             any_order=True,
         )
         self.assertTrue(isinstance(result, DataStream))
-        self.assertEqual(result.dict(by_alias=True), self.data_stream_get)
+        self.assertEqual(result.to_dict(use_source_names=True), 
self.data_stream_get)
 
     @patch("streampipes.client.client.Session", autospec=True)
     def test_endpoint_post(self, http_session: MagicMock):
diff --git a/streampipes-client-python/tests/model/__init__.py 
b/streampipes-client-python/tests/model/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/streampipes-client-python/tests/model/resource/__init__.py 
b/streampipes-client-python/tests/model/resource/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/streampipes-client-python/tests/model/resource/test_data_stream.py 
b/streampipes-client-python/tests/model/resource/test_data_stream.py
new file mode 100644
index 000000000..3688df468
--- /dev/null
+++ b/streampipes-client-python/tests/model/resource/test_data_stream.py
@@ -0,0 +1,56 @@
+from unittest import TestCase
+
+from streampipes.model.resource import DataStream
+
+
+class TestDataStreamWorkaround(TestCase):
+    """
+    Testcase that assures behavior of workaround introduced
+    as a temporary fix for https://github.com/apache/streampipes/issues/1245
+    Needs to be removed as soon as possible
+    """
+
+    def test_nats_case(self):
+        data_stream_def = {
+            "elementId": "some-random-id",
+            "eventGrounding": {
+                "transportProtocols": [
+                    {
+                        "@class": 
"org.apache.streampipes.model.grounding.NatsTransportProtocol",
+                        "brokerHostname": "broker-host-name",
+                        "topicDefinition": {
+                            "@class": "some-class-name",
+                            "actualTopicName": "actual-topic-name"
+                        },
+                        "port": 50
+                    }
+                ]
+            }
+        }
+
+        data_stream = DataStream.parse_obj(data_stream_def)
+
+        self.assertEqual(50, 
data_stream.to_dict()["eventGrounding"]["transportProtocols"][0]["port"])
+
+    def test_kafka_case(self):
+
+        data_stream_def = {
+            "elementId": "some-random-id",
+            "eventGrounding": {
+                "transportProtocols": [
+                    {
+                        "@class": 
"org.apache.streampipes.model.grounding.KafkaTransportProtocol",
+                        "brokerHostname": "broker-host-name",
+                        "topicDefinition": {
+                            "@class": "some-class-name",
+                            "actualTopicName": "actual-topic-name"
+                        },
+                        "kafkaPort": 50
+                    }
+                ]
+            }
+        }
+
+        data_stream = DataStream.parse_obj(data_stream_def)
+
+        self.assertEqual(50, 
data_stream.to_dict()["eventGrounding"]["transportProtocols"][0]["kafkaPort"])

Reply via email to