pritamdodeja commented on issue #35867:
URL: https://github.com/apache/beam/issues/35867#issuecomment-3207983763

   Here's how I can reproduce the bug:
   
   Execute ```debug_prism.py``` with ```MyRecordSchema.py``` in the same 
directory
   
   I had suspected this had something to do with Metrics, but I'm not so sure 
anymore about that.
   
   ```
   # debug_prism.py
   import csv
   import datetime
   import logging
   import os
   import random
   import shutil
   import tempfile
   from typing import (  
       Any,
       NamedTuple,
       Optional,
       get_args,
   )
   
   import apache_beam as beam
   from apache_beam.metrics import Metrics
   from apache_beam.options.pipeline_options import PipelineOptions
   from apache_beam.transforms import window
   
   from MyRecordSchema import MyRecord
   
   # --- Configuration ---
   OUTPUT_FILE = 'random_data.csv'
   NUMBER_OF_ROWS = 5000 # PrismRunner fails at this number of rows, but 
succeeds for lower number of rows (e.g. 2000)
   START_DATE = datetime.datetime(2016, 1, 1, 0, 0, 0)
   END_DATE = datetime.datetime(2016, 6, 1, 0, 0, 0)
   print(f"Generating data for '{OUTPUT_FILE}' from {START_DATE} to {END_DATE} 
(max {NUMBER_OF_ROWS} rows)...")
   
   current_time = START_DATE
   rows_written = 0
   DATA_CONFIG = {
       # Integers with a specific range
       "rybq": {"type": "integer", "params": (1, 5)},
       "qjmw": {"type": "integer", "params": (1, 5)},
       "ksel": {"type": "integer", "params": (1, 5)},
       "eqqq": {"type": "integer", "params": (2000, 2500)},
       "xrus": {"type": "integer", "params": (30, 50)},
       "biej": {"type": "integer", "params": (400, 500)},
       "gftv": {"type": "integer", "params": (350, 450)},
       "qcpd": {"type": "integer", "params": (400, 450)},
       "hnox": {"type": "integer", "params": (60, 80)},
   
       # Normally distributed floats
       "fthm": {"type": "normal", "params": (65.0, 2.0)},
       "xkqg": {"type": "normal", "params": (65.0, 2.0)},
       "fuil": {"type": "normal", "params": (78.0, 1.0)},
       "iusu": {"type": "normal", "params": (78.0, 1.0)},
   
       "sczf": {"type": "integer", "params": (1, 5)},
       "atlf": {"type": "integer", "params": (2300, 2500)},
       "nedy": {"type": "integer", "params": (30, 40)},
   
       # Uniformly distributed floats
       "kxyq": {"type": "uniform", "params": (20.0, 30.0)},
       "gmla": {"type": "uniform", "params": (60.0, 70.0)},
       "ekld": {"type": "uniform", "params": (60.0, 70.0)},
       "hwve": {"type": "uniform", "params": (80.0, 90.0)},
       "tklu": {"type": "uniform", "params": (80.0, 90.0)},
       "pzoa": {"type": "uniform", "params": (0.0, 5.0)},
       "qmkg": {"type": "uniform", "params": (0.0, 5.0)},
       "pazj": {"type": "uniform", "params": (0.0, 1.0)},
       "qdym": {"type": "uniform", "params": (0.0, 1.0)},
       "xyio": {"type": "uniform", "params": (50.0, 60.0)},
       "vxtg": {"type": "uniform", "params": (25.0, 35.0)},
       "nzpk": {"type": "uniform", "params": (25.0, 35.0)},
       "ypvj": {"type": "uniform", "params": (20.0, 25.0)},
   
       "mohz": {"type": "integer", "params": (1, 10)},
       "vpfz": {"type": "integer", "params": (50, 60)},
       "efoo": {"type": "uniform", "params": (25.0, 35.0)},
       "aggn": {"type": "uniform", "params": (25.0, 35.0)},
       "wlwb": {"type": "integer", "params": (20, 30)},
       "bjqq": {"type": "uniform", "params": (70.0, 80.0)},
       "wzbq": {"type": "uniform", "params": (40.0, 50.0)},
       "juks": {"type": "uniform", "params": (30.0, 40.0)},
       "dvdl": {"type": "uniform", "params": (20.0, 30.0)},
   
       # Categorical data
       "avit": {"type": "categorical", "params": [0, 1]},
       "nrha": {"type": "categorical", "params": [0, 1]},
       "lejm": {"type": "categorical", "params": [0, 1]},
       "kwyk": {"type": "categorical", "params": [0, 1]},
       "prqy": {"type": "categorical", "params": [0, 1]},
       "putr": {"type": "categorical", "params": [0, 1]},
       "cpwn": {"type": "categorical", "params": [0, 1]},
       "dwlo": {"type": "categorical", "params": [0, 1]},
       "vhol": {"type": "categorical", "params": [1, 2, 3]},
       "wrmw": {"type": "categorical", "params": [1, 2, 3]},
       "gnmk": {"type": "categorical", "params": [1, 2, 3]},
       "neex": {"type": "categorical", "params": [1, 2, 3]},
       "ylgu": {"type": "categorical", "params": [1, 2, 3, 4]},
       "kvyd": {"type": "categorical", "params": [1, 2, 3, 4]},
       "ywge": {"type": "categorical", "params": [1, 2, 3, 4]},
       "iuvv": {"type": "categorical", "params": [1, 2, 3, 4]},
       "kaij": {"type": "categorical", "params": [1, 2, 3, 4, 5]},
       "nbgs": {"type": "categorical", "params": [1, 2, 3, 4, 5]},
       "llxf": {"type": "categorical", "params": [1, 2, 3, 4, 5]},
       "mwie": {"type": "categorical", "params": [1, 2, 3, 4, 5]},
   }
   
   MISSING_VALUE_CONFIG = {
       "probability": 0.15,  # 15% chance for a value to be missing
       "features": [
           "cpwn", "dwlo", "ekld", "hnox", "iusu", "kaij", "llxf", "mwie",
           "nbgs", "prqy", "putr", "qcpd", "tklu", "xkqg"
       ]
   }
   HEADER = ["timestamp"] + list(DATA_CONFIG.keys())
   
   
   def generate_random_row(current_timestamp):
       """Generates a single row of random data based on the DATA_CONFIG."""
       row = {"timestamp": current_timestamp.strftime('%Y-%m-%d %H:%M:%S')}
   
       for feature, config in DATA_CONFIG.items():
           # Check if this feature should have a missing value
           if feature in MISSING_VALUE_CONFIG["features"] and random.random() < 
MISSING_VALUE_CONFIG["probability"]:
               row[feature] = ''
               continue
   
           # Generate data based on the specified type
           if config["type"] == "normal":
               mean, std_dev = config["params"]
               row[feature] = random.normalvariate(mean, std_dev)
           elif config["type"] == "uniform":
               min_val, max_val = config["params"]
               row[feature] = random.uniform(min_val, max_val)
           elif config["type"] == "integer":
               min_val, max_val = config["params"]
               row[feature] = random.randint(min_val, max_val)
           elif config["type"] == "categorical":
               row[feature] = random.choice(config["params"])
           else:
               row[feature] = ''
   
       return [row[h] for h in HEADER]
   
   
   with open(OUTPUT_FILE, 'w', newline='') as csvfile:
       writer = csv.writer(csvfile)
       
       # Write the header row
       writer.writerow(HEADER)
       
       # Write the data rows until we reach the end date or max rows
       while current_time <= END_DATE and rows_written < NUMBER_OF_ROWS:
           # Increment the timestamp by a random amount (e.g., 1 to 300 seconds)
           current_time += datetime.timedelta(seconds=random.randint(1, 300))
           
           row_data = generate_random_row(current_time)
           writer.writerow(row_data)
           rows_written += 1
           
   print(f"Data generation complete. Wrote {rows_written} rows.")
   
   # Data Processing Section
   
   OUTPUT_FREQUENCY_SECONDS = 60
   TARGET_SEQUENCE_STEPS = 20
   WINDOW_SIZE_SECONDS = TARGET_SEQUENCE_STEPS * OUTPUT_FREQUENCY_SECONDS
   INPUT_FILE_NO_HEADER = "./random_data.csv"
   
   beam.coders.registry.register_coder(MyRecord, beam.coders.RowCoder)
   
   
   class PartitionDoFn(beam.DoFn):
       def __init__(self, fixed_threshold=None):
           self._fixed_threshold = fixed_threshold
   
       def setup(self):
           import logging
           self.logger = logging
   
       def process(self, element_tuple, threshold_from_side_input=None):
           current_key = element_tuple[0]
           actual_threshold = self._fixed_threshold
           if actual_threshold is None:
               if threshold_from_side_input is None:
                   self.logger.error("Class {__class__.__name__} PartitionDoFn: 
No fixed threshold set and no threshold provided from side input. Halting.")
                   raise ValueError("PartitionDoFn requires a threshold.")
               actual_threshold = threshold_from_side_input
           if current_key <= actual_threshold:
               yield beam.pvalue.TaggedOutput('train', element_tuple)
           else:
               yield beam.pvalue.TaggedOutput('eval', element_tuple)
   
   
   class AddTimestamps(beam.PTransform):
       """Assigns event timestamps to records based on their 'timestamp' 
field."""
       def __init__(self, obj):
           self.obj = obj
       class _AddTimestampsDoFn(beam.DoFn):  # Renamed inner class
           def setup(self):
               import logging
               self.logger = logging
   
           def process(self, element):
               """Yields the element wrapped in TimestampedValue."""
               try:
                   ts = element.timestamp
                   if not isinstance(ts, datetime.datetime):
                       self.logger.error(f"Class {__class__.__name__}: 
AddTimestamps: Expected datetime, got {type(ts)} for {element}")
                   beam_timestamp = ts.timestamp()
                   yield beam.window.TimestampedValue(element, beam_timestamp)
               except Exception as e:
                   self.logger.error(f"Class {__class__.__name__}: Error in 
AddTimestamps for UUID {element}, timestamp '{element.timestamp}': {e}")
   
       def expand(self, input_pcollection):
           return input_pcollection | "AddTS_DoFn" >> beam.ParDo(
               self._AddTimestampsDoFn())
   
   
   class ResampleWindowedDataAndAssignStartKey(beam.DoFn):
       def __init__(self, sampling_frequency=30, target_steps=20, obj=None):
           self._sampling_frequency = sampling_frequency
           self._target_steps = target_steps
           self.processed_windows_counter = 
Metrics.counter(self.__class__.__name__, 'processed_windows')
           self.empty_windows_counter = 
Metrics.counter(self.__class__.__name__, 'empty_or_skipped_windows')
           self.records_per_window_dist = 
Metrics.distribution(self.__class__.__name__, 'records_per_window')
           self.obj = obj
   
       def setup(self):
           import logging
           self.logger = logging
           import pandas as pd
           self._pd = pd
           from datetime import datetime, timedelta
           self._datetime = datetime
           self._timedelta = timedelta
           import numpy as np
           self._np = np
   
       def resample_asof(
               self,
               myrecord_df,
               start_time,
               sampling_frequency,
               num_target_steps,
       ):
           """
           Resamples a DataFrame to a new frequency starting from a specified 
time
           for a specific number of steps, using 'as of' logic.
           """
           expected_cols = ['timestamp']
           if myrecord_df is not None and hasattr(myrecord_df, 'columns'):
               expected_cols.extend([col for col in myrecord_df.columns if col 
!= 'timestamp'])
   
           if myrecord_df is None or myrecord_df.empty:
               self.logger.debug(f"{self.__class__.__name__}.resample_asof 
received empty or None DataFrame.")
               return self._pd.DataFrame(columns=expected_cols)
   
           df = myrecord_df.copy()
   
           if 'timestamp' not in df.columns:
               if df.index.name == 'timestamp' and isinstance(df.index, 
self._pd.DatetimeIndex):
                   df = df.reset_index()
               else:
                   self.logger.error("Class {__class__.__name__} 'timestamp' 
column not found in DataFrame for resample_asof.")
                   return self._pd.DataFrame(columns=expected_cols)
           
           if df.empty:
               self.logger.debug("Class {__class__.__name__} DataFrame became 
empty after handling timestamp column for resample_asof.")
               return self._pd.DataFrame(columns=expected_cols)
   
           if not self._pd.api.types.is_datetime64_any_dtype(df['timestamp']):
               try:
                   df['timestamp'] = self._pd.to_datetime(df['timestamp'])
               except Exception as e:
                   self.logger.error(f"Class {__class__.__name__}: Could not 
convert 'timestamp' column to datetime in resample_asof: {e}")
                   return self._pd.DataFrame(columns=expected_cols)
   
           df = df.sort_values('timestamp').reset_index(drop=True)
           if df.empty:
               self.logger.debug("DataFrame empty after sorting in 
resample_asof.")
               return self._pd.DataFrame(columns=expected_cols)
   
           if not isinstance(start_time, self._datetime):
               self.logger.error(f"Class {__class__.__name__}: resample_asof 
expects start_time to be datetime.datetime, got {type(start_time)}")
               return self._pd.DataFrame(columns=expected_cols)
           
           actual_sampling_frequency = sampling_frequency
           if not (actual_sampling_frequency and
                   hasattr(actual_sampling_frequency, 'total_seconds') and
                   actual_sampling_frequency.total_seconds() > 0):
               self.logger.error(f"Class {__class__.__name__}: Invalid 
actual_sampling_frequency: {actual_sampling_frequency} in resample_asof.")
               return self._pd.DataFrame(columns=expected_cols)
   
           try:
               df = df.set_index('timestamp')
           except Exception as e:
               self.logger.error(f"Class {__class__.__name__}: Error setting 
'timestamp' as index in resample_asof: {e}")
               return self._pd.DataFrame(columns=expected_cols)
               
           df_filtered = df[df.index <= start_time + 
self._timedelta(seconds=self._target_steps * self._sampling_frequency)]
   
           if df_filtered.empty:
               self.logger.debug(f"Class {__class__.__name__}: DataFrame empty 
after filtering by start_time '{start_time}' in resample_asof.")
               return self._pd.DataFrame(columns=expected_cols)
   
           resampled_index = None
           if num_target_steps > 0:
               try:
                   resampled_index = self._pd.date_range(
                       start=start_time,
                       periods=num_target_steps,
                       freq=actual_sampling_frequency
                   )
               except ValueError as e:
                   self.logger.error(f"Class {__class__.__name__}: Error 
creating date_range with start={start_time}, periods={num_target_steps}, 
freq={actual_sampling_frequency}: {e}")
                   return self._pd.DataFrame(columns=expected_cols)
           else:
               self.logger.warning(f"Class {__class__.__name__}: 
num_target_steps is not positive ({num_target_steps}), cannot generate 
resampled_index.")
               return self._pd.DataFrame(columns=expected_cols)
   
           if resampled_index is None or resampled_index.empty:
               self.logger.warning("Resampled index is None or empty in 
resample_asof.")
               return self._pd.DataFrame(columns=expected_cols)
   
           resampled_df = df_filtered.reindex(resampled_index, method='ffill')
           
           if resampled_df.empty:
               self.logger.debug("DataFrame empty after reindex in 
resample_asof.")
               return self._pd.DataFrame(columns=expected_cols)
               
           resampled_df = resampled_df.reset_index()
           resampled_df = resampled_df.rename(columns={'index': 'timestamp'})
   
           value_columns = [col for col in resampled_df.columns if col != 
'timestamp']
           if value_columns:
               resampled_df = resampled_df.dropna(subset=value_columns, 
how='all')
           
           if resampled_df.empty:
               self.logger.debug("Class {__class__.__name__} Resampled 
DataFrame became empty after dropna in resample_asof.")
               return self._pd.DataFrame(columns=expected_cols)
   
           final_ordered_columns = [col for col in expected_cols if col in 
resampled_df.columns]
           for col in resampled_df.columns:
               if col not in final_ordered_columns:
                   final_ordered_columns.append(col)
           
           return resampled_df[final_ordered_columns]
   
       def process(
               self,
               element: tuple[int, list[Any]],
               window=beam.DoFn.WindowParam
               ):
           
           window_start_key, records_list = element
           num_records = len(records_list)
   
           self.processed_windows_counter.inc()
           self.records_per_window_dist.update(num_records)
   
           if num_records > 1000:  # Adjust this threshold as needed
               self.logger.warning(
                   f"Processing a LARGE window (key: {window_start_key}) with 
{num_records} records. "
                   f"This may indicate data skew and is a likely bottleneck."
               )
   
           if not records_list:
               self.logger.debug(f"{self.__class__.__name__}.process received 
empty list.")
               self.empty_windows_counter.inc()
               yield None, None
   
           if not element[1]:
               self.logger.debug(f"{self.__class__.__name__}.process received 
empty list.")
               yield None, None
   
           # Timestamps in 'element' should be datetime.datetime objects
           sorted_original_list = sorted(element[1], key=lambda x: x.timestamp)
   
           if not sorted_original_list:
               # This case should ideally not be reached if 'element' is not 
empty
               self.logger.debug(f"{self.__class__.__name__}: 
sorted_original_list is empty.")
               yield None, None
   
           dlist = [d._asdict() for d in sorted_original_list]
           myrecord_df_original = self._pd.DataFrame(dlist)
   
           if myrecord_df_original.empty or 'timestamp' not in 
myrecord_df_original.columns:
               self.logger.debug(f"{self.__class__.__name__}: DataFrame from 
original list is empty or missing 'timestamp'.")
               yield None, None
               
           start_time_for_resampling = myrecord_df_original.iloc[0]['timestamp']
           
           if isinstance(start_time_for_resampling, self._pd.Timestamp):
               start_time_for_resampling = 
start_time_for_resampling.to_pydatetime()
           
           if self._pd.isna(start_time_for_resampling):
               self.logger.warning(f"{self.__class__.__name__}: 
start_time_for_resampling is NaT.")
               yield None, None
   
           sampling_timedelta = 
self._timedelta(seconds=self._sampling_frequency)
   
           resampled_df = self.resample_asof(
               myrecord_df_original,
               start_time=start_time_for_resampling,
               sampling_frequency=sampling_timedelta,
               num_target_steps=self._target_steps
           )
   
           if resampled_df.empty or 'timestamp' not in resampled_df.columns:
               self.logger.debug(f"{self.__class__.__name__}: Resampling for 
window starting near {start_time_for_resampling} yielded empty/invalid 
DataFrame.")
               yield None, None
           
           # Calculate 'internal_sequence' for the resampled data.
           # Assumes resampled_df['timestamp'] are UTC-aware pandas Timestamps.
           resampled_df['internal_sequence'] = 
(resampled_df['timestamp'].astype(self._np.int64) // 10**9)
           # Removed: + 18000 
   
           myrecord_list_resampled = []
           for i in range(len(resampled_df)):
               myrecord_dict = resampled_df.iloc[i].to_dict()
               
               if 'timestamp' in myrecord_dict and 
isinstance(myrecord_dict['timestamp'], self._pd.Timestamp):
                   myrecord_dict['timestamp'] = 
myrecord_dict['timestamp'].to_pydatetime()
               
               try:
                   del myrecord_dict['internal_sequence']
                   myrecord_object = self.obj(**myrecord_dict)
                   myrecord_list_resampled.append(myrecord_object)
               except Exception as e:
                   self.logger.error(f"Class {__class__.__name__}: Failed to 
create MyRecord in {self.__class__.__name__} from dict {myrecord_dict}: {e}")
                   continue 
   
           if not myrecord_list_resampled:
               self.logger.debug(f"{self.__class__.__name__}: Resampled list is 
empty after object conversion, not yielding.")
               yield None, None
           
           key_for_partitioning = 
int(myrecord_list_resampled[-1].timestamp.timestamp())
   
           yield key_for_partitioning, myrecord_list_resampled
   
   
   class ConvertToMyRecordBasic(beam.DoFn):
       """Parses a CSV string line into a MyRecord using 
convert_to_datapoint."""
   
       def __init__(self, obj):
           self.obj = obj
   
       def setup(self):
           import logging
           self.logger = logging
           import csv
           self.csv = csv
           # self.convert_to_datapoint is available globally
           # self.MyRecord is available globally
   
       def convert_to_datapoint(self, row: list, obj: type) -> 
tuple[Optional[NamedTuple], Optional[str]]:
           def get_dict_name_type_mapping(obj):
               dict_name_type_mapping = {}
               for column_name, column_typing in zip(obj._fields, 
obj.__annotations__.values()):
                   typing_args = get_args(column_typing)
                   if len(typing_args) == 2:
                       dict_name_type_mapping[column_name] = [t for t in 
typing_args if t is not type(None)][0]
                   else:
                       dict_name_type_mapping[column_name] = column_typing
               return dict_name_type_mapping
   
           dict_name_value_mapping = dict(zip(obj._fields, row))
           dict_name_type_mapping = get_dict_name_type_mapping(obj)
           converted_dict = {}
           imputation_values_dict = {}
           imputation_values_dict[int] = -1
           imputation_values_dict[float] = -1.0
           imputation_values_dict[str] = "missing"
           imputation_values_dict[datetime.datetime] = 
datetime.datetime.fromtimestamp(0, tz=datetime.timezone.utc)
   
           for feature_name, feature_value, feature_type in 
zip(dict_name_value_mapping.keys(), row, dict_name_type_mapping.values()):
               try:
                   if feature_value in imputation_values_dict.values():
                       # We have a feature whose value is something we use to 
indicate the absence of a value
                       raise ValueError(f"Feature {feature_name} has a value 
{feature_value} which we use to indicate absence of value.")
                   converted_dict[feature_name] = feature_type(feature_value)
               except Exception:
                   if feature_type == datetime.datetime:
                       formats_to_try = ['%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d 
%H:%M:%S', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S.%f 
%Z', '%Y-%m-%d %H:%M:%S %Z']
                       for fmt in formats_to_try:
                           try:
                               dt_obj = 
datetime.datetime.strptime(feature_value, fmt)
                               converted_dict[feature_name] = 
dt_obj.replace(tzinfo=datetime.timezone.utc) # Assume UTC if naive
                           except ValueError:
                               continue
                   else:
                       converted_dict[feature_name] = 
imputation_values_dict[feature_type]
   
           try:
               return obj(**converted_dict)
           except TypeError as e:
               self.logger.error(f"Class {__class__.__name__}: Final 
instantiation of {obj.__name__} failed with data {converted_dict}: {e}")
               return None
   
       def process(self, element: str):
           # Use csv module for more robust parsing than simple split
           try:
               # Wrap element in a list because csv.reader expects an iterable 
of
               # lines
               reader = self.csv.reader(
                   [element],
                   quotechar='"',
                   delimiter=",",
                   quoting=self.csv.QUOTE_ALL,
                   skipinitialspace=True)
               row_raw = next(reader)  # Get the first (and only) row
           except StopIteration:
               self.logger.warning(
                   f"Class {__class__.__name__}: CSV reader failed for element: 
'{element}'. Skipping.")
               yield
           except Exception as e:
               self.logger.error(
                   f"Class {__class__.__name__}: CSV parsing error for element 
'{element[:100]}...': {e}")
               yield  # Skip rows with parsing errors
   
           if row_raw:
               try:
                   datapoint = self.convert_to_datapoint(row_raw, self.obj)
                   yield datapoint
               except Exception as e:
                   self.logger.error(
                       f"Class {__class__.__name__}: Failed creating MyRecord 
from {datapoint} with exception {e}")
   
   options = {
       'runner': 'PrismRunner',  # Toggle with to DirectRunner/PrismRunner to 
avoid/create bug, comment out direct_num_workers and direct_running_mode as not 
applicable
       # 'direct_num_workers': 0,  # 0 often defaults to thread-based or single 
process
       # 'direct_running_mode': 'multi_processing',  # Or 'multi_threading'
       # 'semi_persistent_directory': tempfile.mkdtemp(), # Optional
       # 'profile_cpu': True,
       # 'profile_memory': True,
       # 'profile_location': "./beam_profile"
   }
   opts = PipelineOptions(flags=[], **options)
   output_base_path = "./prism"
   logger = logging
   
   
   def process_and_write(pcoll, split_name, output_path):
       split_output_dir = os.path.join(output_path, f"Split-{split_name}")
       try:
           if os.path.exists(split_output_dir):
               shutil.rmtree(split_output_dir)
           os.makedirs(split_output_dir)
           logger.info(f"Prepared output directory {split_output_dir}")
       except Exception as e:
           logger.warning(f"Could not manage directory {split_output_dir}: {e}")
   
       # This transform now has three outputs: main (TFRecord), sequences_data, 
and mapping_data
       processed_outputs = (
           pcoll
           | f"Extract_{split_name}{output_path}_Sequences" >> 
beam.MapTuple(lambda seq_id, steps: list(steps))
       )
   
   
   with beam.Pipeline(options=opts) as p:
       parsed_timestamped_pcollection = (
           p
           | "ReadCSV" >> beam.io.ReadFromText(INPUT_FILE_NO_HEADER, 
skip_header_lines=1) 
           | "ParseCSV" >> 
beam.ParDo(ConvertToMyRecordBasic(obj=MyRecord).with_output_types(MyRecord))
           | "AddEventTimestamps" >> 
AddTimestamps(obj=MyRecord).with_output_types(MyRecord)
           # | beam.combiners.Count.Globally() | beam.LogElements()
       )
       list_of_fixed_windows = []
       for offset in range(0, 60, 5):
           temp_windowed_grouped_sequences = (
               parsed_timestamped_pcollection
               # | "ApplySlidingWindow" >> beam.WindowInto(
               #     window.SlidingWindows(WINDOW_SIZE_SECONDS, 
WINDOW_PERIOD_SECONDS))
               | f"ApplyFixedWindowOffset{str(offset)}" >> 
beam.WindowInto(window.FixedWindows(WINDOW_SIZE_SECONDS, offset=offset))
               | f"ApplyMap{str(offset)}" >> beam.Map(lambda element, 
w=beam.DoFn.WindowParam: (int(w.start), element))
               # | f"ApplyGroupByKey{str(offset)}" >> 
beam.GroupByKey().with_output_types(tuple[int, list[MyRecord]])
               # | 
beam.CombinePerKey(beam.combiners.ToListCombineFn()).with_hot_key_fanout(4) 
#does not work with sliding windows
               | f"ApplyCombine{offset}" >> 
beam.CombinePerKey(beam.combiners.ToListCombineFn())
               | f"ResampleAndKey{offset}" >> 
beam.ParDo(ResampleWindowedDataAndAssignStartKey(
                   sampling_frequency=OUTPUT_FREQUENCY_SECONDS,
                   target_steps=TARGET_SEQUENCE_STEPS,
                   obj=MyRecord,
               )).with_output_types(tuple[int, list[MyRecord]])
           )
           list_of_fixed_windows.append(temp_windowed_grouped_sequences)
       windowed_grouped_sequences = list_of_fixed_windows | beam.Flatten()
       data_split = (
           windowed_grouped_sequences
           # Pass the side input to ParDo. It will be an extra argument to 
PartitionDoFn.process
           | "PartitionData" >> beam.ParDo(
               PartitionDoFn(fixed_threshold=1459503446),
           ).with_outputs('train', 'eval')
       )
       train_pcoll = data_split.train
       eval_pcoll = data_split.eval
       train_output = process_and_write(train_pcoll, "train", output_base_path)
       eval_output = process_and_write(eval_pcoll, "eval", output_base_path)
   ```
   
   ```
   # MyRecordSchema.py
   from typing import NamedTuple, Optional
   import datetime
   class MyRecord(NamedTuple):
       timestamp: datetime.datetime
       rybq: int
       qjmw: int
       ksel: int
       eqqq: int
       xrus: int
       biej: int
       gftv: int
       qcpd: Optional[int]
       hnox: Optional[int]
       fthm: float
       xkqg: Optional[float]
       fuil: float
       iusu: Optional[float]
       sczf: int
       atlf: int
       nedy: int
       kxyq: float
       gmla: float
       ekld: Optional[float]
       hwve: float
       tklu: Optional[float]
       pzoa: float
       qmkg: float
       pazj: float
       qdym: float
       xyio: float
       vxtg: float
       nzpk: float
       ypvj: float
       mohz: int
       vpfz: int
       efoo: float
       aggn: float
       wlwb: int
       bjqq: float
       wzbq: float
       juks: float
       dvdl: float
       avit: int
       nrha: int
       lejm: int
       kwyk: int
       prqy: Optional[int]
       putr: Optional[int]
       cpwn: Optional[int]
       dwlo: Optional[int]
       vhol: int
       wrmw: int
       gnmk: int
       neex: int
       ylgu: int
       kvyd: int
       ywge: int
       iuvv: int
       kaij: Optional[int]
       nbgs: Optional[int]
       llxf: Optional[int]
       mwie: Optional[int]
   ```
   
   The error I get is:
   
   ```
   Exception in thread wait_until_finish_read:
   Traceback (most recent call last):
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 
1016, in _bootstrap_inner
   Traceback (most recent call last):
     File "/home/pritamdodeja/demo/template/debug_prism.py", line 554, in 
<module>
       self.run()
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 
953, in run
       with beam.Pipeline(options=opts) as p:
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/pipeline.py",
 line 663, in __exit__
       self._target(*self._args, **self._kwargs)
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 533, in read_messages
       self.result.wait_until_finish()
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 568, in wait_until_finish
       for message in self._message_stream:
       raise self._runtime_exception
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py",
 line 543, in __next__
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 574, in _observe_state
       for state_response in self._state_stream:
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py",
 line 543, in __next__
       return self._next()
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py",
 line 969, in _next
       return self._next()
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py",
 line 969, in _next
       raise self
   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC 
that terminated with:
           status = StatusCode.DEADLINE_EXCEEDED
           details = "Deadline Exceeded"
           debug_error_string = "UNKNOWN:Error received from peer  
{created_time:"2025-08-20T21:42:49.149954627+02:00", grpc_status:4, 
grpc_message:"Deadline Exceeded"}"
   >
       raise self
   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC 
that terminated with:
           status = StatusCode.DEADLINE_EXCEEDED
           details = "Deadline Exceeded"
           debug_error_string = "UNKNOWN:Error received from peer  
{created_time:"2025-08-20T21:42:49.150497414+02:00", grpc_status:4, 
grpc_message:"Deadline Exceeded"}"
   >
   Exception in thread 
run_worker_job-001[job]_ref_Environment_default_environment_1:
   Traceback (most recent call last):
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 
1016, in _bootstrap_inner
       self.run()
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 
953, in run
       self._target(*self._args, **self._kwargs)
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 283, in run
       getattr(self, SdkHarness.REQUEST_METHOD_PREFIX + request_type)(
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 343, in _request_process_bundle_progress
       self._request_process_bundle_action(request)
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 352, in _request_process_bundle_action
       self._report_progress_executor.submit(task)
     File 
"/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/concurrent/futures/thread.py",
 line 169, in submit
       raise RuntimeError('cannot schedule new futures after '
   RuntimeError: cannot schedule new futures after interpreter shutdown
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to