Kai-Michael Roesner created SPARK-40952:
-------------------------------------------
Summary: Exception when handling timestamp data in PySpark
Structured Streaming
Key: SPARK-40952
URL: https://issues.apache.org/jira/browse/SPARK-40952
Project: Spark
Issue Type: Bug
Components: PySpark, Structured Streaming, Windows
Affects Versions: 3.3.0
Environment: OS: Windows 10
Reporter: Kai-Michael Roesner
I'm trying to process data that contains timestamps in PySpark "Structured
Streaming" using the
[foreach|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach]
option. When I run the job I get a `OSError: [Errno 22] Invalid argument`
exception in \pyspark\sql\types.py at the
{noformat}
return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts %
1000000)
{noformat}
statement.
I have boiled down my Spark job to the essentials:
{noformat}
from pyspark.sql import SparkSession
def handle_row(row):
print(f'Processing: \{row}')
spark = (SparkSession.builder
.appName('test.stream.tstmp.byrow')
.getOrCreate())
data = (spark.readStream
.option('delimiter', ',')
.option('header', True)
.schema('a integer, b string, c timestamp')
.csv('data/test'))
query = (data.writeStream
.foreach(handle_row)
.start())
query.awaitTermination()
{noformat}
In the `data/test` folder I have one csv file:
{noformat}
a,b,c
1,x,1970-01-01 00:59:59.999
2,y,1999-12-31 23:59:59.999
3,z,2022-10-18 15:53:12.345
{noformat}
If I change the csv schema to `'a integer, b string, c string'` everything
works fine and I get the expected output of
{noformat}
Processing: Row(a=1, b='x', c='1970-01-01 00:59:59.999')
Processing: Row(a=2, b='y', c='1999-12-31 23:59:59.999')
Processing: Row(a=3, b='z', c='2022-10-18 15:53:12.345')
{noformat}
Also, if I change the stream handling to
[micro-batches|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch]
like so:
{noformat}
...
def handle_batch(df, epoch_id):
print(f'Processing: \{df} - Epoch: \{epoch_id}')
...
query = (data.writeStream
.foreachBatch(handle_batch)
.start())
{noformat}
I get the expected output of
{noformat}
Processing: DataFrame[a: int, b: string, c: timestamp] - Epoch: 0
{noformat}
But "by row" handling should work with the row having the correct column data
type of `timestamp`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]