pritamdodeja commented on issue #35867: URL: https://github.com/apache/beam/issues/35867#issuecomment-3207983763
Here's how I can reproduce the bug: Execute ```debug_prism.py``` with ```MyRecordSchema.py``` in the same directory I had suspected this had something to do with Metrics, but I'm not so sure anymore about that. ``` # debug_prism.py import csv import datetime import logging import os import random import shutil import tempfile from typing import ( Any, NamedTuple, Optional, get_args, ) import apache_beam as beam from apache_beam.metrics import Metrics from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms import window from MyRecordSchema import MyRecord # --- Configuration --- OUTPUT_FILE = 'random_data.csv' NUMBER_OF_ROWS = 5000 # PrismRunner fails at this number of rows, but succeeds for lower number of rows (e.g. 2000) START_DATE = datetime.datetime(2016, 1, 1, 0, 0, 0) END_DATE = datetime.datetime(2016, 6, 1, 0, 0, 0) print(f"Generating data for '{OUTPUT_FILE}' from {START_DATE} to {END_DATE} (max {NUMBER_OF_ROWS} rows)...") current_time = START_DATE rows_written = 0 DATA_CONFIG = { # Integers with a specific range "rybq": {"type": "integer", "params": (1, 5)}, "qjmw": {"type": "integer", "params": (1, 5)}, "ksel": {"type": "integer", "params": (1, 5)}, "eqqq": {"type": "integer", "params": (2000, 2500)}, "xrus": {"type": "integer", "params": (30, 50)}, "biej": {"type": "integer", "params": (400, 500)}, "gftv": {"type": "integer", "params": (350, 450)}, "qcpd": {"type": "integer", "params": (400, 450)}, "hnox": {"type": "integer", "params": (60, 80)}, # Normally distributed floats "fthm": {"type": "normal", "params": (65.0, 2.0)}, "xkqg": {"type": "normal", "params": (65.0, 2.0)}, "fuil": {"type": "normal", "params": (78.0, 1.0)}, "iusu": {"type": "normal", "params": (78.0, 1.0)}, "sczf": {"type": "integer", "params": (1, 5)}, "atlf": {"type": "integer", "params": (2300, 2500)}, "nedy": {"type": "integer", "params": (30, 40)}, # Uniformly distributed floats "kxyq": {"type": "uniform", "params": (20.0, 30.0)}, "gmla": {"type": "uniform", "params": (60.0, 70.0)}, "ekld": {"type": "uniform", "params": (60.0, 70.0)}, "hwve": {"type": "uniform", "params": (80.0, 90.0)}, "tklu": {"type": "uniform", "params": (80.0, 90.0)}, "pzoa": {"type": "uniform", "params": (0.0, 5.0)}, "qmkg": {"type": "uniform", "params": (0.0, 5.0)}, "pazj": {"type": "uniform", "params": (0.0, 1.0)}, "qdym": {"type": "uniform", "params": (0.0, 1.0)}, "xyio": {"type": "uniform", "params": (50.0, 60.0)}, "vxtg": {"type": "uniform", "params": (25.0, 35.0)}, "nzpk": {"type": "uniform", "params": (25.0, 35.0)}, "ypvj": {"type": "uniform", "params": (20.0, 25.0)}, "mohz": {"type": "integer", "params": (1, 10)}, "vpfz": {"type": "integer", "params": (50, 60)}, "efoo": {"type": "uniform", "params": (25.0, 35.0)}, "aggn": {"type": "uniform", "params": (25.0, 35.0)}, "wlwb": {"type": "integer", "params": (20, 30)}, "bjqq": {"type": "uniform", "params": (70.0, 80.0)}, "wzbq": {"type": "uniform", "params": (40.0, 50.0)}, "juks": {"type": "uniform", "params": (30.0, 40.0)}, "dvdl": {"type": "uniform", "params": (20.0, 30.0)}, # Categorical data "avit": {"type": "categorical", "params": [0, 1]}, "nrha": {"type": "categorical", "params": [0, 1]}, "lejm": {"type": "categorical", "params": [0, 1]}, "kwyk": {"type": "categorical", "params": [0, 1]}, "prqy": {"type": "categorical", "params": [0, 1]}, "putr": {"type": "categorical", "params": [0, 1]}, "cpwn": {"type": "categorical", "params": [0, 1]}, "dwlo": {"type": "categorical", "params": [0, 1]}, "vhol": {"type": "categorical", "params": [1, 2, 3]}, "wrmw": {"type": "categorical", "params": [1, 2, 3]}, "gnmk": {"type": "categorical", "params": [1, 2, 3]}, "neex": {"type": "categorical", "params": [1, 2, 3]}, "ylgu": {"type": "categorical", "params": [1, 2, 3, 4]}, "kvyd": {"type": "categorical", "params": [1, 2, 3, 4]}, "ywge": {"type": "categorical", "params": [1, 2, 3, 4]}, "iuvv": {"type": "categorical", "params": [1, 2, 3, 4]}, "kaij": {"type": "categorical", "params": [1, 2, 3, 4, 5]}, "nbgs": {"type": "categorical", "params": [1, 2, 3, 4, 5]}, "llxf": {"type": "categorical", "params": [1, 2, 3, 4, 5]}, "mwie": {"type": "categorical", "params": [1, 2, 3, 4, 5]}, } MISSING_VALUE_CONFIG = { "probability": 0.15, # 15% chance for a value to be missing "features": [ "cpwn", "dwlo", "ekld", "hnox", "iusu", "kaij", "llxf", "mwie", "nbgs", "prqy", "putr", "qcpd", "tklu", "xkqg" ] } HEADER = ["timestamp"] + list(DATA_CONFIG.keys()) def generate_random_row(current_timestamp): """Generates a single row of random data based on the DATA_CONFIG.""" row = {"timestamp": current_timestamp.strftime('%Y-%m-%d %H:%M:%S')} for feature, config in DATA_CONFIG.items(): # Check if this feature should have a missing value if feature in MISSING_VALUE_CONFIG["features"] and random.random() < MISSING_VALUE_CONFIG["probability"]: row[feature] = '' continue # Generate data based on the specified type if config["type"] == "normal": mean, std_dev = config["params"] row[feature] = random.normalvariate(mean, std_dev) elif config["type"] == "uniform": min_val, max_val = config["params"] row[feature] = random.uniform(min_val, max_val) elif config["type"] == "integer": min_val, max_val = config["params"] row[feature] = random.randint(min_val, max_val) elif config["type"] == "categorical": row[feature] = random.choice(config["params"]) else: row[feature] = '' return [row[h] for h in HEADER] with open(OUTPUT_FILE, 'w', newline='') as csvfile: writer = csv.writer(csvfile) # Write the header row writer.writerow(HEADER) # Write the data rows until we reach the end date or max rows while current_time <= END_DATE and rows_written < NUMBER_OF_ROWS: # Increment the timestamp by a random amount (e.g., 1 to 300 seconds) current_time += datetime.timedelta(seconds=random.randint(1, 300)) row_data = generate_random_row(current_time) writer.writerow(row_data) rows_written += 1 print(f"Data generation complete. Wrote {rows_written} rows.") # Data Processing Section OUTPUT_FREQUENCY_SECONDS = 60 TARGET_SEQUENCE_STEPS = 20 WINDOW_SIZE_SECONDS = TARGET_SEQUENCE_STEPS * OUTPUT_FREQUENCY_SECONDS INPUT_FILE_NO_HEADER = "./random_data.csv" beam.coders.registry.register_coder(MyRecord, beam.coders.RowCoder) class PartitionDoFn(beam.DoFn): def __init__(self, fixed_threshold=None): self._fixed_threshold = fixed_threshold def setup(self): import logging self.logger = logging def process(self, element_tuple, threshold_from_side_input=None): current_key = element_tuple[0] actual_threshold = self._fixed_threshold if actual_threshold is None: if threshold_from_side_input is None: self.logger.error("Class {__class__.__name__} PartitionDoFn: No fixed threshold set and no threshold provided from side input. Halting.") raise ValueError("PartitionDoFn requires a threshold.") actual_threshold = threshold_from_side_input if current_key <= actual_threshold: yield beam.pvalue.TaggedOutput('train', element_tuple) else: yield beam.pvalue.TaggedOutput('eval', element_tuple) class AddTimestamps(beam.PTransform): """Assigns event timestamps to records based on their 'timestamp' field.""" def __init__(self, obj): self.obj = obj class _AddTimestampsDoFn(beam.DoFn): # Renamed inner class def setup(self): import logging self.logger = logging def process(self, element): """Yields the element wrapped in TimestampedValue.""" try: ts = element.timestamp if not isinstance(ts, datetime.datetime): self.logger.error(f"Class {__class__.__name__}: AddTimestamps: Expected datetime, got {type(ts)} for {element}") beam_timestamp = ts.timestamp() yield beam.window.TimestampedValue(element, beam_timestamp) except Exception as e: self.logger.error(f"Class {__class__.__name__}: Error in AddTimestamps for UUID {element}, timestamp '{element.timestamp}': {e}") def expand(self, input_pcollection): return input_pcollection | "AddTS_DoFn" >> beam.ParDo( self._AddTimestampsDoFn()) class ResampleWindowedDataAndAssignStartKey(beam.DoFn): def __init__(self, sampling_frequency=30, target_steps=20, obj=None): self._sampling_frequency = sampling_frequency self._target_steps = target_steps self.processed_windows_counter = Metrics.counter(self.__class__.__name__, 'processed_windows') self.empty_windows_counter = Metrics.counter(self.__class__.__name__, 'empty_or_skipped_windows') self.records_per_window_dist = Metrics.distribution(self.__class__.__name__, 'records_per_window') self.obj = obj def setup(self): import logging self.logger = logging import pandas as pd self._pd = pd from datetime import datetime, timedelta self._datetime = datetime self._timedelta = timedelta import numpy as np self._np = np def resample_asof( self, myrecord_df, start_time, sampling_frequency, num_target_steps, ): """ Resamples a DataFrame to a new frequency starting from a specified time for a specific number of steps, using 'as of' logic. """ expected_cols = ['timestamp'] if myrecord_df is not None and hasattr(myrecord_df, 'columns'): expected_cols.extend([col for col in myrecord_df.columns if col != 'timestamp']) if myrecord_df is None or myrecord_df.empty: self.logger.debug(f"{self.__class__.__name__}.resample_asof received empty or None DataFrame.") return self._pd.DataFrame(columns=expected_cols) df = myrecord_df.copy() if 'timestamp' not in df.columns: if df.index.name == 'timestamp' and isinstance(df.index, self._pd.DatetimeIndex): df = df.reset_index() else: self.logger.error("Class {__class__.__name__} 'timestamp' column not found in DataFrame for resample_asof.") return self._pd.DataFrame(columns=expected_cols) if df.empty: self.logger.debug("Class {__class__.__name__} DataFrame became empty after handling timestamp column for resample_asof.") return self._pd.DataFrame(columns=expected_cols) if not self._pd.api.types.is_datetime64_any_dtype(df['timestamp']): try: df['timestamp'] = self._pd.to_datetime(df['timestamp']) except Exception as e: self.logger.error(f"Class {__class__.__name__}: Could not convert 'timestamp' column to datetime in resample_asof: {e}") return self._pd.DataFrame(columns=expected_cols) df = df.sort_values('timestamp').reset_index(drop=True) if df.empty: self.logger.debug("DataFrame empty after sorting in resample_asof.") return self._pd.DataFrame(columns=expected_cols) if not isinstance(start_time, self._datetime): self.logger.error(f"Class {__class__.__name__}: resample_asof expects start_time to be datetime.datetime, got {type(start_time)}") return self._pd.DataFrame(columns=expected_cols) actual_sampling_frequency = sampling_frequency if not (actual_sampling_frequency and hasattr(actual_sampling_frequency, 'total_seconds') and actual_sampling_frequency.total_seconds() > 0): self.logger.error(f"Class {__class__.__name__}: Invalid actual_sampling_frequency: {actual_sampling_frequency} in resample_asof.") return self._pd.DataFrame(columns=expected_cols) try: df = df.set_index('timestamp') except Exception as e: self.logger.error(f"Class {__class__.__name__}: Error setting 'timestamp' as index in resample_asof: {e}") return self._pd.DataFrame(columns=expected_cols) df_filtered = df[df.index <= start_time + self._timedelta(seconds=self._target_steps * self._sampling_frequency)] if df_filtered.empty: self.logger.debug(f"Class {__class__.__name__}: DataFrame empty after filtering by start_time '{start_time}' in resample_asof.") return self._pd.DataFrame(columns=expected_cols) resampled_index = None if num_target_steps > 0: try: resampled_index = self._pd.date_range( start=start_time, periods=num_target_steps, freq=actual_sampling_frequency ) except ValueError as e: self.logger.error(f"Class {__class__.__name__}: Error creating date_range with start={start_time}, periods={num_target_steps}, freq={actual_sampling_frequency}: {e}") return self._pd.DataFrame(columns=expected_cols) else: self.logger.warning(f"Class {__class__.__name__}: num_target_steps is not positive ({num_target_steps}), cannot generate resampled_index.") return self._pd.DataFrame(columns=expected_cols) if resampled_index is None or resampled_index.empty: self.logger.warning("Resampled index is None or empty in resample_asof.") return self._pd.DataFrame(columns=expected_cols) resampled_df = df_filtered.reindex(resampled_index, method='ffill') if resampled_df.empty: self.logger.debug("DataFrame empty after reindex in resample_asof.") return self._pd.DataFrame(columns=expected_cols) resampled_df = resampled_df.reset_index() resampled_df = resampled_df.rename(columns={'index': 'timestamp'}) value_columns = [col for col in resampled_df.columns if col != 'timestamp'] if value_columns: resampled_df = resampled_df.dropna(subset=value_columns, how='all') if resampled_df.empty: self.logger.debug("Class {__class__.__name__} Resampled DataFrame became empty after dropna in resample_asof.") return self._pd.DataFrame(columns=expected_cols) final_ordered_columns = [col for col in expected_cols if col in resampled_df.columns] for col in resampled_df.columns: if col not in final_ordered_columns: final_ordered_columns.append(col) return resampled_df[final_ordered_columns] def process( self, element: tuple[int, list[Any]], window=beam.DoFn.WindowParam ): window_start_key, records_list = element num_records = len(records_list) self.processed_windows_counter.inc() self.records_per_window_dist.update(num_records) if num_records > 1000: # Adjust this threshold as needed self.logger.warning( f"Processing a LARGE window (key: {window_start_key}) with {num_records} records. " f"This may indicate data skew and is a likely bottleneck." ) if not records_list: self.logger.debug(f"{self.__class__.__name__}.process received empty list.") self.empty_windows_counter.inc() yield None, None if not element[1]: self.logger.debug(f"{self.__class__.__name__}.process received empty list.") yield None, None # Timestamps in 'element' should be datetime.datetime objects sorted_original_list = sorted(element[1], key=lambda x: x.timestamp) if not sorted_original_list: # This case should ideally not be reached if 'element' is not empty self.logger.debug(f"{self.__class__.__name__}: sorted_original_list is empty.") yield None, None dlist = [d._asdict() for d in sorted_original_list] myrecord_df_original = self._pd.DataFrame(dlist) if myrecord_df_original.empty or 'timestamp' not in myrecord_df_original.columns: self.logger.debug(f"{self.__class__.__name__}: DataFrame from original list is empty or missing 'timestamp'.") yield None, None start_time_for_resampling = myrecord_df_original.iloc[0]['timestamp'] if isinstance(start_time_for_resampling, self._pd.Timestamp): start_time_for_resampling = start_time_for_resampling.to_pydatetime() if self._pd.isna(start_time_for_resampling): self.logger.warning(f"{self.__class__.__name__}: start_time_for_resampling is NaT.") yield None, None sampling_timedelta = self._timedelta(seconds=self._sampling_frequency) resampled_df = self.resample_asof( myrecord_df_original, start_time=start_time_for_resampling, sampling_frequency=sampling_timedelta, num_target_steps=self._target_steps ) if resampled_df.empty or 'timestamp' not in resampled_df.columns: self.logger.debug(f"{self.__class__.__name__}: Resampling for window starting near {start_time_for_resampling} yielded empty/invalid DataFrame.") yield None, None # Calculate 'internal_sequence' for the resampled data. # Assumes resampled_df['timestamp'] are UTC-aware pandas Timestamps. resampled_df['internal_sequence'] = (resampled_df['timestamp'].astype(self._np.int64) // 10**9) # Removed: + 18000 myrecord_list_resampled = [] for i in range(len(resampled_df)): myrecord_dict = resampled_df.iloc[i].to_dict() if 'timestamp' in myrecord_dict and isinstance(myrecord_dict['timestamp'], self._pd.Timestamp): myrecord_dict['timestamp'] = myrecord_dict['timestamp'].to_pydatetime() try: del myrecord_dict['internal_sequence'] myrecord_object = self.obj(**myrecord_dict) myrecord_list_resampled.append(myrecord_object) except Exception as e: self.logger.error(f"Class {__class__.__name__}: Failed to create MyRecord in {self.__class__.__name__} from dict {myrecord_dict}: {e}") continue if not myrecord_list_resampled: self.logger.debug(f"{self.__class__.__name__}: Resampled list is empty after object conversion, not yielding.") yield None, None key_for_partitioning = int(myrecord_list_resampled[-1].timestamp.timestamp()) yield key_for_partitioning, myrecord_list_resampled class ConvertToMyRecordBasic(beam.DoFn): """Parses a CSV string line into a MyRecord using convert_to_datapoint.""" def __init__(self, obj): self.obj = obj def setup(self): import logging self.logger = logging import csv self.csv = csv # self.convert_to_datapoint is available globally # self.MyRecord is available globally def convert_to_datapoint(self, row: list, obj: type) -> tuple[Optional[NamedTuple], Optional[str]]: def get_dict_name_type_mapping(obj): dict_name_type_mapping = {} for column_name, column_typing in zip(obj._fields, obj.__annotations__.values()): typing_args = get_args(column_typing) if len(typing_args) == 2: dict_name_type_mapping[column_name] = [t for t in typing_args if t is not type(None)][0] else: dict_name_type_mapping[column_name] = column_typing return dict_name_type_mapping dict_name_value_mapping = dict(zip(obj._fields, row)) dict_name_type_mapping = get_dict_name_type_mapping(obj) converted_dict = {} imputation_values_dict = {} imputation_values_dict[int] = -1 imputation_values_dict[float] = -1.0 imputation_values_dict[str] = "missing" imputation_values_dict[datetime.datetime] = datetime.datetime.fromtimestamp(0, tz=datetime.timezone.utc) for feature_name, feature_value, feature_type in zip(dict_name_value_mapping.keys(), row, dict_name_type_mapping.values()): try: if feature_value in imputation_values_dict.values(): # We have a feature whose value is something we use to indicate the absence of a value raise ValueError(f"Feature {feature_name} has a value {feature_value} which we use to indicate absence of value.") converted_dict[feature_name] = feature_type(feature_value) except Exception: if feature_type == datetime.datetime: formats_to_try = ['%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S.%f %Z', '%Y-%m-%d %H:%M:%S %Z'] for fmt in formats_to_try: try: dt_obj = datetime.datetime.strptime(feature_value, fmt) converted_dict[feature_name] = dt_obj.replace(tzinfo=datetime.timezone.utc) # Assume UTC if naive except ValueError: continue else: converted_dict[feature_name] = imputation_values_dict[feature_type] try: return obj(**converted_dict) except TypeError as e: self.logger.error(f"Class {__class__.__name__}: Final instantiation of {obj.__name__} failed with data {converted_dict}: {e}") return None def process(self, element: str): # Use csv module for more robust parsing than simple split try: # Wrap element in a list because csv.reader expects an iterable of # lines reader = self.csv.reader( [element], quotechar='"', delimiter=",", quoting=self.csv.QUOTE_ALL, skipinitialspace=True) row_raw = next(reader) # Get the first (and only) row except StopIteration: self.logger.warning( f"Class {__class__.__name__}: CSV reader failed for element: '{element}'. Skipping.") yield except Exception as e: self.logger.error( f"Class {__class__.__name__}: CSV parsing error for element '{element[:100]}...': {e}") yield # Skip rows with parsing errors if row_raw: try: datapoint = self.convert_to_datapoint(row_raw, self.obj) yield datapoint except Exception as e: self.logger.error( f"Class {__class__.__name__}: Failed creating MyRecord from {datapoint} with exception {e}") options = { 'runner': 'PrismRunner', # Toggle with to DirectRunner/PrismRunner to avoid/create bug, comment out direct_num_workers and direct_running_mode as not applicable # 'direct_num_workers': 0, # 0 often defaults to thread-based or single process # 'direct_running_mode': 'multi_processing', # Or 'multi_threading' # 'semi_persistent_directory': tempfile.mkdtemp(), # Optional # 'profile_cpu': True, # 'profile_memory': True, # 'profile_location': "./beam_profile" } opts = PipelineOptions(flags=[], **options) output_base_path = "./prism" logger = logging def process_and_write(pcoll, split_name, output_path): split_output_dir = os.path.join(output_path, f"Split-{split_name}") try: if os.path.exists(split_output_dir): shutil.rmtree(split_output_dir) os.makedirs(split_output_dir) logger.info(f"Prepared output directory {split_output_dir}") except Exception as e: logger.warning(f"Could not manage directory {split_output_dir}: {e}") # This transform now has three outputs: main (TFRecord), sequences_data, and mapping_data processed_outputs = ( pcoll | f"Extract_{split_name}{output_path}_Sequences" >> beam.MapTuple(lambda seq_id, steps: list(steps)) ) with beam.Pipeline(options=opts) as p: parsed_timestamped_pcollection = ( p | "ReadCSV" >> beam.io.ReadFromText(INPUT_FILE_NO_HEADER, skip_header_lines=1) | "ParseCSV" >> beam.ParDo(ConvertToMyRecordBasic(obj=MyRecord).with_output_types(MyRecord)) | "AddEventTimestamps" >> AddTimestamps(obj=MyRecord).with_output_types(MyRecord) # | beam.combiners.Count.Globally() | beam.LogElements() ) list_of_fixed_windows = [] for offset in range(0, 60, 5): temp_windowed_grouped_sequences = ( parsed_timestamped_pcollection # | "ApplySlidingWindow" >> beam.WindowInto( # window.SlidingWindows(WINDOW_SIZE_SECONDS, WINDOW_PERIOD_SECONDS)) | f"ApplyFixedWindowOffset{str(offset)}" >> beam.WindowInto(window.FixedWindows(WINDOW_SIZE_SECONDS, offset=offset)) | f"ApplyMap{str(offset)}" >> beam.Map(lambda element, w=beam.DoFn.WindowParam: (int(w.start), element)) # | f"ApplyGroupByKey{str(offset)}" >> beam.GroupByKey().with_output_types(tuple[int, list[MyRecord]]) # | beam.CombinePerKey(beam.combiners.ToListCombineFn()).with_hot_key_fanout(4) #does not work with sliding windows | f"ApplyCombine{offset}" >> beam.CombinePerKey(beam.combiners.ToListCombineFn()) | f"ResampleAndKey{offset}" >> beam.ParDo(ResampleWindowedDataAndAssignStartKey( sampling_frequency=OUTPUT_FREQUENCY_SECONDS, target_steps=TARGET_SEQUENCE_STEPS, obj=MyRecord, )).with_output_types(tuple[int, list[MyRecord]]) ) list_of_fixed_windows.append(temp_windowed_grouped_sequences) windowed_grouped_sequences = list_of_fixed_windows | beam.Flatten() data_split = ( windowed_grouped_sequences # Pass the side input to ParDo. It will be an extra argument to PartitionDoFn.process | "PartitionData" >> beam.ParDo( PartitionDoFn(fixed_threshold=1459503446), ).with_outputs('train', 'eval') ) train_pcoll = data_split.train eval_pcoll = data_split.eval train_output = process_and_write(train_pcoll, "train", output_base_path) eval_output = process_and_write(eval_pcoll, "eval", output_base_path) ``` ``` # MyRecordSchema.py from typing import NamedTuple, Optional import datetime class MyRecord(NamedTuple): timestamp: datetime.datetime rybq: int qjmw: int ksel: int eqqq: int xrus: int biej: int gftv: int qcpd: Optional[int] hnox: Optional[int] fthm: float xkqg: Optional[float] fuil: float iusu: Optional[float] sczf: int atlf: int nedy: int kxyq: float gmla: float ekld: Optional[float] hwve: float tklu: Optional[float] pzoa: float qmkg: float pazj: float qdym: float xyio: float vxtg: float nzpk: float ypvj: float mohz: int vpfz: int efoo: float aggn: float wlwb: int bjqq: float wzbq: float juks: float dvdl: float avit: int nrha: int lejm: int kwyk: int prqy: Optional[int] putr: Optional[int] cpwn: Optional[int] dwlo: Optional[int] vhol: int wrmw: int gnmk: int neex: int ylgu: int kvyd: int ywge: int iuvv: int kaij: Optional[int] nbgs: Optional[int] llxf: Optional[int] mwie: Optional[int] ``` The error I get is: ``` Exception in thread wait_until_finish_read: Traceback (most recent call last): File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 1016, in _bootstrap_inner Traceback (most recent call last): File "/home/pritamdodeja/demo/template/debug_prism.py", line 554, in <module> self.run() File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 953, in run with beam.Pipeline(options=opts) as p: File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/pipeline.py", line 663, in __exit__ self._target(*self._args, **self._kwargs) File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", line 533, in read_messages self.result.wait_until_finish() File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", line 568, in wait_until_finish for message in self._message_stream: raise self._runtime_exception File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py", line 543, in __next__ File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", line 574, in _observe_state for state_response in self._state_stream: File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py", line 543, in __next__ return self._next() File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py", line 969, in _next return self._next() File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py", line 969, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.DEADLINE_EXCEEDED details = "Deadline Exceeded" debug_error_string = "UNKNOWN:Error received from peer {created_time:"2025-08-20T21:42:49.149954627+02:00", grpc_status:4, grpc_message:"Deadline Exceeded"}" > raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.DEADLINE_EXCEEDED details = "Deadline Exceeded" debug_error_string = "UNKNOWN:Error received from peer {created_time:"2025-08-20T21:42:49.150497414+02:00", grpc_status:4, grpc_message:"Deadline Exceeded"}" > Exception in thread run_worker_job-001[job]_ref_Environment_default_environment_1: Traceback (most recent call last): File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 1016, in _bootstrap_inner self.run() File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 953, in run self._target(*self._args, **self._kwargs) File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 283, in run getattr(self, SdkHarness.REQUEST_METHOD_PREFIX + request_type)( File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in _request_process_bundle_progress self._request_process_bundle_action(request) File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 352, in _request_process_bundle_action self._report_progress_executor.submit(task) File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/concurrent/futures/thread.py", line 169, in submit raise RuntimeError('cannot schedule new futures after ' RuntimeError: cannot schedule new futures after interpreter shutdown ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org