Kai-Michael Roesner created SPARK-40952:
-------------------------------------------

             Summary: Exception when handling timestamp data in PySpark 
Structured Streaming
                 Key: SPARK-40952
                 URL: https://issues.apache.org/jira/browse/SPARK-40952
             Project: Spark
          Issue Type: Bug
          Components: PySpark, Structured Streaming, Windows
    Affects Versions: 3.3.0
         Environment: OS: Windows 10 
            Reporter: Kai-Michael Roesner


I'm trying to process data that contains timestamps in PySpark "Structured 
Streaming" using the 
[foreach|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach]
 option. When I run the job I get a `OSError: [Errno 22] Invalid argument` 
exception in \pyspark\sql\types.py at the
{noformat}
return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 
1000000)
{noformat}
statement.

I have boiled down my Spark job to the essentials:
{noformat}
from pyspark.sql import SparkSession

def handle_row(row):
  print(f'Processing: \{row}')

spark = (SparkSession.builder
  .appName('test.stream.tstmp.byrow')
  .getOrCreate())

data = (spark.readStream
  .option('delimiter', ',')
  .option('header', True)
  .schema('a integer, b string, c timestamp')
  .csv('data/test'))

query = (data.writeStream
  .foreach(handle_row)
  .start())

query.awaitTermination()
{noformat}

In the `data/test` folder I have one csv file:
{noformat}
a,b,c
1,x,1970-01-01 00:59:59.999
2,y,1999-12-31 23:59:59.999
3,z,2022-10-18 15:53:12.345
{noformat}

If I change the csv schema to `'a integer, b string, c string'` everything 
works fine and I get the expected output of
{noformat}
Processing: Row(a=1, b='x', c='1970-01-01 00:59:59.999')
Processing: Row(a=2, b='y', c='1999-12-31 23:59:59.999')
Processing: Row(a=3, b='z', c='2022-10-18 15:53:12.345')
{noformat}

Also, if I change the stream handling to 
[micro-batches|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch]
 like so:
{noformat}
...
def handle_batch(df, epoch_id):
  print(f'Processing: \{df} - Epoch: \{epoch_id}')
...
query = (data.writeStream
  .foreachBatch(handle_batch)
  .start())
{noformat}
I get the expected output of
{noformat}
Processing: DataFrame[a: int, b: string, c: timestamp] - Epoch: 0
{noformat}

But "by row" handling should work with the row having the correct column data 
type of `timestamp`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to