CWrecker opened a new issue, #33229:
URL: https://github.com/apache/beam/issues/33229

   ### What would you like to happen?
   
   Apache Beam lacks a native Python-based IO connector that can ingest data 
directly from a socket. This feature would enable users to easily integrate 
streaming data sources, such as those emitting messages over TCP/IP sockets, 
into their Apache Beam pipelines.
   
   Many real-time data sources, such as custom data generators, IoT devices, 
and legacy systems, often send data over sockets. Building a socket-based IO 
connector in Python would allow Beam pipelines to process this data seamlessly 
without requiring users to implement custom socket reading logic outside the 
Beam ecosystem.
   
   
   Any advice on implementing an unbounded source would be appreciated. I have 
only recently begun to dig into Apache Beam. 
   
   **Additional Context**
   
   Existing IO connectors in Beam are often geared towards standard services 
like Kafka, Pub/Sub, etc. Adding support for sockets will cater to users 
dealing with more specialized or ad-hoc data sources.
   
   **Current approach to read from socket**
   
   ```
   
   class ReadFromWebSocket(beam.DoFn):
       """
       A custom DoFn to read messages from a WebSocket stream.
       """
       def __init__(self, ws_url):
           """
           Initializes the WebSocket reader with the target URL.
           """
           self.ws_url = ws_url
           self.ws_connection = None
   
       def setup(self):
           """
           Set up the WebSocket connection.
           """
           self.ws_connection = create_connection(self.ws_url)
           print(f"Connected to WebSocket: {self.ws_url}")
   
       def process(self, element, *args, **kwargs):
           """
           Reads data from the WebSocket and outputs it as elements.
           """
           try:
               while True:
                   # Read from WebSocket
                   message = self.ws_connection.recv()
                   message = json.loads(message)
                   yield beam.window.TimestampedValue(message, 
datetime.datetime.now().timestamp())
                   # Avoid busy-waiting
                   time.sleep(0.01)
           except Exception as e:
               print(f"Error while reading WebSocket: {e}")
   
       def teardown(self):
           """
           Clean up the WebSocket connection.
           """
           if self.ws_connection:
               self.ws_connection.close()
               print(f"Closed WebSocket connection to {self.ws_url}")
   ```
   
   **Pipeline Example**
   
   ```
     with beam.Pipeline(options=options) as pipeline:
   
             # Start with a dummy source (PBegin) that triggers the custom DoFn
             (
   
                 pipeline
                 | "CreateStart" >> beam.Create([None])  # Start with a single 
dummy element
   
                 | "ReadFromWebSocket" >> beam.ParDo(ReadFromWebSocket(ws_url))
   
                 | "WindowIntoFixed" >> beam.WindowInto(
                     GlobalWindows(),
                     trigger=trigger.Repeatedly(trigger.AfterCount(10)),
                     accumulation_mode=AccumulationMode.ACCUMULATING)
                 # Extract and sum username/score pairs from the event data.
                 | 'ExtractAndSumScore' >> ExtractAndSumScore('team')
                 | "PrintMessages" >> beam.Map(print)  # Replace with actual 
processing logic
             )
   ```
   
   ```
   class ExtractAndSumScore(beam.PTransform):
     """A transform to extract key/score information and sum the scores.
     The constructor argument `field` determines whether 'team' or 'user' info 
is
     extracted.
     """
     def __init__(self, field):
       # TODO(BEAM-6158): Revert the workaround once we can pickle super() on 
py3.
       # super().__init__()
       beam.PTransform.__init__(self)
       self.field = field
   
     def expand(self, pcoll):
       print(pcoll)
       return (
           pcoll
           | beam.Map(lambda elem: (elem[self.field], elem['score']))
           | beam.CombinePerKey(sum))
   
   ```
   
   The current pipeline stalls when combined with a window and aggregation.
   
   
   
   ### Issue Priority
   
   Priority: 3 (nice-to-have improvement)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to