Treize44 commented on issue #33759:
URL: https://github.com/apache/arrow/issues/33759#issuecomment-1402240611

   While writing code to write and read the dataset, I have noticed that my 
problem is linked to the size of the metadata of the schema ; is there a limit ?
   # Here is my code : 
   
   import json
   from typing import Any
   
   import numpy as np
   import pandas as pd
   import pyarrow as pa
   import pyarrow.dataset as ds
   
   DATASET_HEADER = "dataset_header"
   PARQUET_FLAVOR = "hive"
   PARQUET_FORMAT = "parquet"
   TIME_COLUMN = "gps_time_stamp"
   DEFAULT_PARTITIONING: ds.Partitioning = ds.partitioning(
       pa.schema([(TIME_COLUMN, pa.timestamp("us"))]),
       flavor=PARQUET_FLAVOR,
   )
   
   dataset_path = "/tmp/dataset"
   nb_data_values = 60000
   
   
   def generate_dataframe():
       nb_rows = 400
       data = np.float32(np.random.normal(size=nb_data_values))
       return pd.DataFrame([{"ATTRIBUTE": i, "data": data} for i in 
range(nb_rows)])
   
   
   def write_fragment(df, path):
       dataset_header: dict[str, Any] = {}
       dataset_header["sampling"] = np.arange(0, nb_data_values, 1).tolist()  # 
if removed, the problem disapear
       schema_with_metadata = 
pa.Schema.from_pandas(df).with_metadata({DATASET_HEADER: 
json.dumps(dataset_header)})
       ds.write_dataset(
           data=pa.Table.from_pandas(df).cast(schema_with_metadata),
           base_dir=path,
           format=PARQUET_FORMAT,
           partitioning=DEFAULT_PARTITIONING,
           existing_data_behavior="overwrite_or_ignore",
           
file_options=ds.ParquetFileFormat().make_write_options(allow_truncated_timestamps=True),
       )
   
   
   def write():
       trace_length_pd = pd.Timedelta(milliseconds=60000)
       first_timestamp = pd.Timestamp("2023-01-23 09:15:00.000000")
       nb_timestamps = 260
       for i in range(nb_timestamps):
           df = generate_dataframe()
           df[TIME_COLUMN] = pd.Timestamp(first_timestamp + i * trace_length_pd)
           print("Generating data for timestamp ", i)
           write_fragment(df, dataset_path)
   
   
   def read():
       dataset = ds.dataset(dataset_path, format="parquet", 
partitioning=DEFAULT_PARTITIONING)
       unique_values = set()
       for batch in dataset.to_batches(columns=[TIME_COLUMN]):
           unique_values.update(batch.column(TIME_COLUMN).unique().to_numpy())
       print(len(unique_values))
   
   
   write()
   read()


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to