Ah, this looks like a bug in Beam on Windows. It looks like send_signal(signal.SIGINT) is not a cross-platform way to close a process. We should probably use terminate or kill [1] here instead. I opened BEAM-12501 [2] for this issue.
+dev <[email protected]> for awareness - I think this will affect most external transforms in Python. Thanks for letting us know about this Igor [1] https://docs.python.org/3/library/subprocess.html#subprocess.Popen.terminate [2] https://issues.apache.org/jira/browse/BEAM-12501 On Tue, Jun 15, 2021 at 5:19 PM Igor Gois <[email protected]> wrote: > Hi Brian, > > Thank you for your clarification. > > Actually, I am only trying to run a simple batch pipeline using the Sql > transform locally. [1] > > The Kafka error didn't happen to me. I only mentioned it because I found > the same error message on google. > > Here is the full error: > Traceback (most recent call last): > File "beam-sql.py", line 18, in <module> > |'sql print' >> beam.Map(print) > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\pvalue.py", > line 142, in __or__ > return self.pipeline.apply(ptransform, self) > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\pipeline.py", > line 641, in apply > transform.transform, pvalueish, label or transform.label) > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\pipeline.py", > line 651, in apply > return self.apply(transform, pvalueish) > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\pipeline.py", > line 694, in apply > pvalueish_result = self.runner.apply(transform, pvalueish, > self._options) > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\runners\runner.py", > line 188, in apply > return m(transform, input, options) > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\runners\runner.py", > line 218, in apply_PTransform > return transform.expand(input) > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\transforms\external.py", > line 304, in expand > pipeline.local_tempdir) > File > "c:\users\XXX\appdata\local\programs\python\python37\lib\contextlib.py", > line 119, in __exit__ > next(self.gen) > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\transforms\external.py", > line 351, in _service > yield stub > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\transforms\external.py", > line 503, in __exit__ > self._service_provider.__exit__(*args) > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\utils\subprocess_server.py", > line 72, in __exit__ > self.stop() > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\utils\subprocess_server.py", > line 131, in stop > self.stop_process() > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\utils\subprocess_server.py", > line 181, in stop_process > return super(JavaJarServer, self).stop_process() > File > "C:\Users\XXX\Documents\8-Git\apache_beam\venv\lib\site-packages\apache_beam\utils\subprocess_server.py", > line 141, in stop_process > self._process.send_signal(signal.SIGINT) > File > "c:\users\XXX\appdata\local\programs\python\python37\lib\subprocess.py", > line 1306, in send_signal > raise ValueError("Unsupported signal: {}".format(sig)) > ValueError: Unsupported signal: 2 > > Thank you again and congratulations for the youtube video. It's very nice! > > [1] > https://stackoverflow.com/questions/67977704/apache-beam-with-sqltransform-in-direct-runner/67990831#67990831 > > Att, > > Igor Gois > > > > > > > Am Di., 15. Juni 2021 um 19:56 Uhr schrieb Brian Hulette < > [email protected]>: > >> Hi Igor, >> >> "Universal Local Runner" is a term we've used in the past for a runner >> that executes your pipeline locally. It's similar to each SDK's >> DirectRunner, except that by leveraging portability we should only need one >> implementation, making it "universal". I don't think we've been using that >> term recently, I'm sorry I mentioned it in that talk and confused things. >> >> The Python DirectRunner is basically the ULR since it is a portable >> runner. Unfortunately there's one big caveat: Python's portable >> DirectRunner (also called FnApiRunner) doesn't support streaming right now. >> So when you use the DirectRunner for a streaming Python pipeline, it ends >> up running on the Python SDK's non-portable DirectRunner. I suspect that's >> the issue you're running into here: SqlTransform and KafkaIO in Python both >> will only work on portable runners, but you likely are trying to run a >> streaming pipeline if you're using KafkaIO. >> >> It's hard to tell for sure from that error message though, could you >> share the full stacktrace? >> >> Brian >> >> On Tue, Jun 15, 2021 at 3:05 PM Igor Gois <[email protected]> wrote: >> >>> Hi, >>> >>> I am trying to run Sql transform on windows using direct runner and >>> apache beam (2.30.0): >>> >>> import apache_beam as beamfrom apache_beam.transforms.sql import >>> SqlTransform >>> with beam.Pipeline() as p: >>> pipe = ( >>> p >>> |'hello' >> beam.Create([('SE',400),('SC',500)]) >>> |'schema' >> beam.Map(lambda x: beam.Row( >>> state=x[0], >>> population=x[1] >>> )) >>> ) >>> >>> sql = ( >>> pipe >>> |'sql' >> SqlTransform('SELECT state, population FROM PCOLLECTION') >>> |'sql print' >> beam.Map(print) >>> ) >>> >>> >>> And I got this error: >>> >>> File >>> "c:\users\XXX\appdata\local\programs\python\python37\lib\subprocess.py", >>> line 1306, in send_signal >>> raise ValueError("Unsupported signal: {}".format(sig)) >>> ValueError: Unsupported signal: 2 >>> >>> >>> I followed this video on youtube [1] and it mentions Universal Local >>> Runner (ULR) but I didn't find anything about it on [2]. I also found a >>> similar error on [3] but didn't figure out how to solve it and was related >>> to kafka. >>> >>> Can anyone help me? >>> >>> Thanks in advance >>> >>> [1] https://youtu.be/zx4p-UNSmrA?t=2097 >>> [2] https://beam.apache.org/documentation/runners/direct/ >>> [3] >>> https://stackoverflow.com/questions/65780044/readfromkafka-throws-valueerror-unsupported-signal-2 >>> >>> >>> Igor Gois >>> >>> >>> >>> >>>
