hm, I tried the attached code. This code tries to simulates handling TIME
data in Spark using Parquet files. Since Spark does not support a direct
TIME datatype, it follows these steps:

   - Stores time as a STRING in a Parquet file using PyArrow.
   - Reads the Parquet file using PyArrow, Pandas, and Spark to verify the
   data.
   - Converts STRING to TIMESTAMP in Spark.
   - Extracts hour, minute, and second from the TIMESTAMP column.

If you run it, you will get this

Parquet file 'time_example_fixed.parquet' created successfully.
Checking if Parquet file exists...
Parquet file 'time_example_fixed.parquet' exists.
File size: 1686 bytes
Parquet file is not empty.
Reading with PyArrow
PyArrow Data Preview:
        event_time
0  12:34:56.789000
1  23:45:43.210000

Reading with Pandas
Pandas Data Preview:
        event_time
0  12:34:56.789000
1  23:45:43.210000

Reading with Spark

Read the Parquet file using Spark

Data loaded from Parquet file:
+----------+
|event_time|
+----------+
+----------+

root
 |-- event_time: string (nullable = true)

Convert STRING time to TIMESTAMP HH:mm:ss.SSSSSS format
Extract hours, minutes, and seconds
Data with extracted time components
+----------+----+------+------+
|event_time|hour|minute|second|
+----------+----+------+------+
+----------+----+------+------+

 It fails to display data in Spark while PyArrow and Pandas work fine. This
suggests that Spark is failing to interpret the Parquet file correctly.
The problem is likely caused by one or more of the following issues.

   1. PyArrow-Written Parquet Metadata Is not fully compatible with Spark
   2. Spark Reads the Schema but fails to detect rows
   3. Schema mismatch between PyArrow and Spark
   4. Corrupted or Empty Parquet File

Feel free to take the program and make it work

HTH

Dr Mich Talebzadeh,
Architect | Data Science | Financial Crime | Forensic Analysis | GDPR

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>





On Wed, 12 Feb 2025 at 19:53, Max Gekk <max.g...@gmail.com> wrote:

> Hello Mich,
>
> > However, if you only need to work with time, you can do like below
>
> 1. Let's say a Spark SQL user would like to load TIME values stored in
> files in the parquet format which supports the TIME logical type
> https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#time.
> None of your proposed workarounds allow loading such data.
> 2. One of the goals to introduce the new type is simplifying migrations
> from systems that support the TIME type like PostgreSQL, Snowflake, Google
> SQL, Amazon Redshift, Teradata, DB2 to Spark SQL. Rewriting their existing
> SQL code to store TIME values as DATE or INTERVAL DAY TO SECOND looks ugly,
> IMHO, and can lead to error prone code.
>
> Yours faithfully,
> Max Gekk
>
> On Wed, Feb 12, 2025 at 8:13 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Not entirely convinced we need it!
>>
>> For example, Oracle does not have it.Oracle treats date and time as a
>> single entity, as they are often used together in real-world applications.
>> This approach simplifies many operations, such as sorting, filtering, and
>> calculations involving both date and time. However, if you only need to
>> work with time, you can do like below
>>
>>    1. Use DATE or TIMESTAMP to store time, and extract the time portion
>>    using TO_CHAR.
>>    2. Use INTERVAL DAY TO SECOND for durations or time intervals.
>>    3. If you only care about time, you can ignore the date portion or
>>    set it to a default value.
>>
>>  HTH
>>
>> Dr Mich Talebzadeh,
>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>> On Wed, 12 Feb 2025 at 18:56, Sakthi <sak...@apache.org> wrote:
>>
>>> Thanks for the proposal, Max. This looks very promising. I'd also be
>>> happy to contribute if it helps with task completion!
>>>
>>> Regards,
>>> Sakthi
>>>
>>> On Wed, Feb 12, 2025 at 10:36 AM Max Gekk <max.g...@gmail.com> wrote:
>>>
>>>> Hi Dongjoon,
>>>>
>>>> > According to SPIP, is this targeting Apache Spark 4.2.0?
>>>>
>>>> Some tasks could be done in parallel, but if only one person will work
>>>> on this sequentially, in the worst case it might be finished close to 
>>>> 4.2.0.
>>>>
>>>> Best regards,
>>>> Max Gekk
>>>>
>>>> On Wed, Feb 12, 2025 at 5:48 PM Dongjoon Hyun <dongj...@apache.org>
>>>> wrote:
>>>>
>>>>> According to SPIP, is this targeting Apache Spark 4.2.0?
>>>>>
>>>>> > Q7. How long will it take?
>>>>> > In total it might take around 9 months.
>>>>>
>>>>> Dongjoon.
>>>>>
>>>>> On 2025/02/12 09:38:56 Max Gekk wrote:
>>>>> > Hi All,
>>>>> >
>>>>> > I would like to propose a new data type TIME which represents only
>>>>> time
>>>>> > values without the date part comparing to TIMESTAMP_NTZ. New type
>>>>> should
>>>>> > improve:
>>>>> > - migrations of SQL code from other DBMS where such type is supported
>>>>> > - read/write it from/to data sources such as parquet
>>>>> > - conform to the SQL standard
>>>>> >
>>>>> > SPIP: https://issues.apache.org/jira/browse/SPARK-51162
>>>>> >
>>>>> > Your comments and feedback would be greatly appreciated.
>>>>> >
>>>>> > Yours faithfully,
>>>>> > Max Gekk
>>>>> >
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>
>>>>>
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, hour, minute, second, col
from pyspark.sql.types import StructType, StructField, StringType

class TimeSimulator:
    def __init__(self, parquet_file, spark):
        self.parquet_file = parquet_file
        self.spark = spark

    def create_parquet_with_time_as_string(self):
        """
        Create a Spark-compatible Parquet file with TIME stored as STRING.
        """
        time_values = ["12:34:56.789000", "23:45:43.210000"]

        # Create a Pandas DataFrame
        df = pd.DataFrame({"event_time": time_values})

        # Convert Pandas DataFrame to PyArrow Table
        table = pa.Table.from_pandas(df)

        # Ensure PyArrow writes a file with explicit row groups & proper metadata
        pq.write_table(
            table, self.parquet_file, row_group_size=10, use_dictionary=False
        )

        print(f"\nParquet file '{self.parquet_file}' created successfully.")

    def process_parquet_with_spark(self):
        """
        Read and process the Parquet file using Spark.
        """

        print("\nChecking if Parquet file exists...")
        if os.path.exists(self.parquet_file):
            print(f"\nParquet file '{self.parquet_file}' exists.")

            # Check file size
            file_size = os.path.getsize(self.parquet_file)
            print(f"File size: {file_size} bytes")

            if file_size == 0:
                print("\nERROR: The Parquet file is empty!")
                exit(1)
            else:
                print("\nParquet file is not empty.")
        else:
            print(f"\nERROR: Parquet file '{self.parquet_file}' does NOT exist.")
            exit(1)

        print("\nReading with PyArrow")
        try:
            table = pq.read_table(self.parquet_file)
            df_pyarrow = table.to_pandas()
            print("\nPyArrow Data Preview:")
            print(df_pyarrow)
        except Exception as e:
            print(f"\nERROR: PyArrow failed to read the Parquet file: {e}")

        print("\nReading with Pandas")
        try:
            df_pandas = pd.read_parquet(self.parquet_file, engine="pyarrow")
            print("\nPandas Data Preview:")
            print(df_pandas)
        except Exception as e:
            print(f"\nERROR: Pandas failed to read the Parquet file: {e}")

        print("\nReading with Spark")

        # Manually define schema to prevent Spark inference failure
        schema = StructType([StructField("event_time", StringType(), True)])

        print("\nRead the Parquet file using Spark")
        df = self.spark.read.schema(schema).parquet(self.parquet_file)

        print("\nData loaded from Parquet file:")
        df.show(truncate=False)
        df.printSchema()

        print("\nConvert STRING time to TIMESTAMP HH:mm:ss.SSSSSS format")
        df = df.withColumn("event_time", to_timestamp(col("event_time"), "HH:mm:ss.SSSSSS"))

        print("\nExtract hours, minutes, and seconds")
        df = df.withColumn("hour", hour(df["event_time"])) \
               .withColumn("minute", minute(df["event_time"])) \
               .withColumn("second", second(df["event_time"]))

        print("\nData with extracted time components")
        df.show(truncate=False)

        # Stop Spark session
        self.spark.stop()

def main():
    appName = "ParquetTimeExample"
    spark = SparkSession.builder.appName(appName).enableHiveSupport().getOrCreate()
    
    # Set the log level to ERROR to reduce verbosity
    spark.sparkContext.setLogLevel("ERROR")
    parquet_file = "time_example_fixed.parquet"

    # Create an instance of TimeSimulator
    simulator = TimeSimulator(parquet_file, spark)

    # Run the methods
    simulator.create_parquet_with_time_as_string()
    simulator.process_parquet_with_spark()

if __name__ == "__main__":
    main()

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to