Dear,
I am using pyflink to develop a stream processing system. In one of the
processes I have processed a keyed_stream (which should be giving out a
data_stream according to my knowledge). Later I am trying to add a sink to
this data_stream. While this is erroring out, the same sink applied to
another data stream works fine. I am not able to decode the error.
error while sinking:
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string = "UNKNOWN:Error received from peer
ipv6:%5B::1%5D:34683 {created_time:"2024-08-09T18:20:28.159147552+05:30",
grpc_status:1, grpc_message:"Multiplexer hanging up"}"
the process function on keyed stream:
class process_load_data(KeyedProcessFunction):
def open(self,runtime_context):
self.state = runtime_context.get_map_state(MapStateDescriptor("my_state",
Types.INT(), Types.FLOAT()))
def process_element(self, value, ctx: ProcessFunction.Context):
data = json.loads(value)
previous_state = self.state.get(data[IO_ID_INDEX])
self.state.put(data[IO_ID_INDEX], data[VALUE_INDEX])
if not previous_state or ( previous_state and previous_state != data[value]
):
yield value
and the function call for sink and processing:
kafka_sink = FlinkKafkaProducer(
topic="load_data",
serialization_schema=SimpleStringSchema(),
producer_config=kafka_props
)
raw_data_stream = env.from_source(kafka_source, WatermarkStrategy.
for_monotonous_timestamps(),"Raw_Data")
load_data = raw_data_stream.key_by(key_selector()).process(process_load_data
())
load_data.add_sink(kafka_sink)
python version - 3.10.12
flink version - 1.19.1
I am using a standalone flink cluster