Copilot commented on code in PR #1826: URL: https://github.com/apache/iggy/pull/1826#discussion_r2112464459
########## core/connectors/runtime/src/sink.rs: ########## @@ -0,0 +1,359 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use dlopen2::wrapper::Container; +use futures::StreamExt; +use iggy::prelude::{ + AutoCommit, AutoCommitWhen, IggyClient, IggyConsumer, IggyDuration, IggyMessage, + PollingStrategy, +}; +use iggy_connector_sdk::{ + DecodedMessage, MessagesMetadata, RawMessage, RawMessages, ReceivedMessage, StreamDecoder, + TopicMetadata, sink::ConsumeCallback, transforms::Transform, +}; +use std::{ + collections::HashMap, + str::FromStr, + sync::{Arc, atomic::Ordering}, + time::Instant, +}; +use tracing::{error, info, warn}; + +use crate::{ + PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer, SinkConnectorPlugin, + SinkConnectorWrapper, configs::SinkConfig, resolve_plugin_path, transform, +}; + +pub async fn init( + sink_configs: HashMap<String, SinkConfig>, + iggy_client: &IggyClient, +) -> Result<HashMap<String, SinkConnector>, RuntimeError> { + let mut sink_connectors: HashMap<String, SinkConnector> = HashMap::new(); + for (key, config) in sink_configs { + let name = config.name; + if !config.enabled { + warn!("Sink: {name} is disabled ({key})"); + continue; + } + + let plugin_id = PLUGIN_ID.load(Ordering::Relaxed); + let path = resolve_plugin_path(&config.path); + info!("Initializing sink container with name: {name} ({key}), plugin: {path}",); + if let Some(container) = sink_connectors.get_mut(&path) { + info!("Sink container for plugin: {path} is already loaded.",); + init_sink( + &container.container, + &config.config.unwrap_or_default(), + plugin_id, + ); + container.plugins.push(SinkConnectorPlugin { + id: plugin_id, + consumers: vec![], + }); + } else { + let container: Container<SinkApi> = + unsafe { Container::load(&path).expect("Failed to load sink container") }; + info!("Sink container for plugin: {path} loaded successfully.",); + init_sink(&container, &config.config.unwrap_or_default(), plugin_id); + sink_connectors.insert( + path.to_owned(), + SinkConnector { + container, + plugins: vec![SinkConnectorPlugin { + id: plugin_id, + consumers: vec![], + }], + }, + ); + } + + info!( + "Sink container with name: {name} ({key}), initialized successfully with ID: {plugin_id}." + ); + PLUGIN_ID.fetch_add(1, Ordering::Relaxed); + + let transforms = if let Some(transforms_config) = config.transforms { + let transforms = transform::load(transforms_config).expect("Failed to load transforms"); + let types = transforms + .iter() + .map(|t| t.r#type().into()) + .collect::<Vec<&'static str>>() + .join(", "); + info!("Enabled transforms for sink: {name} ({key}): {types}",); + transforms + } else { + vec![] + }; + + let connector = sink_connectors + .get_mut(&path) + .expect("Failed to get sink connector"); + let plugin = connector + .plugins + .iter_mut() + .find(|p| p.id == plugin_id) + .expect("Failed to get sink plugin"); + + for stream in config.streams { + let poll_interval = + IggyDuration::from_str(&stream.poll_interval.unwrap_or("5ms".to_owned())) + .expect("Invalid poll interval"); + let consumer_group = stream + .consumer_group + .unwrap_or(format!("iggy-connect-{key}")); + let batch_size = stream.batch_size.unwrap_or(1000); + for topic in stream.topics { + let mut consumer = iggy_client + .consumer_group(&consumer_group, &stream.stream, &topic)? + .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) + .create_consumer_group_if_not_exists() + .auto_join_consumer_group() + .polling_strategy(PollingStrategy::next()) + .poll_interval(poll_interval) + .batch_size(batch_size) + .build(); + + consumer.init().await?; + plugin.consumers.push(SinkConnectorConsumer { + consumer, + decoder: stream.schema.decoder(), + batch_size, + transforms: transforms.clone(), + }); + } + } + } + + Ok(sink_connectors) +} + +pub fn consume(sinks: Vec<SinkConnectorWrapper>) { + for sink in sinks { + for plugin in sink.plugins { + info!("Starting consume for sink with ID: {}...", plugin.id); + for consumer in plugin.consumers { + tokio::spawn(async move { + if let Err(error) = consume_messages( + plugin.id, + consumer.decoder, + consumer.batch_size, + sink.callback, + consumer.transforms, + consumer.consumer, + ) + .await + { + error!( + "Failed to consume messages for sink connector with ID: {}. {error}", + plugin.id + ); + return; + } + info!( + "Consume messages for sink connector with ID: {} started successfully.", + plugin.id + ); + }); + } + } + } +} + +async fn consume_messages( + plugin_id: u32, + decoder: Arc<dyn StreamDecoder>, + batch_size: u32, + consume: ConsumeCallback, + transforms: Vec<Arc<dyn Transform>>, + mut consumer: IggyConsumer, +) -> Result<(), RuntimeError> { + info!("Started consuming messages for sink connector with ID: {plugin_id}"); + let batch_size = batch_size as usize; + let mut batch = Vec::with_capacity(batch_size); + let topic_metadata = TopicMetadata { + stream: consumer.stream().to_string(), + topic: consumer.topic().to_string(), + }; + + while let Some(message) = consumer.next().await { + let Ok(message) = message else { + error!("Failed to receive message."); + continue; + }; + + let partition_id = message.partition_id; + let current_offset = message.current_offset; + let message_offset = message.message.header.offset; + batch.push(message.message); + if current_offset != message_offset && batch.len() < batch_size { + continue; + } + + let messages = std::mem::take(&mut batch); + let messages_count = messages.len(); + let messages_metadata = MessagesMetadata { + partition_id, + current_offset, + schema: decoder.schema(), + }; + info!( + "Processing {messages_count} messages for sink connector with ID: {}", + plugin_id + ); + let start = Instant::now(); + if let Err(error) = process_messages( + plugin_id, + messages_metadata, + &topic_metadata, + messages, + &consume, + &transforms, + &decoder, + ) + .await + { + error!( + "Failed to process {messages_count} messages for sink connector with ID: {plugin_id}. {error}", + ); + continue; + } + + let elapsed = start.elapsed(); + info!( + "Consumed {messages_count} messages in {:#?} for sink connector with ID: {plugin_id}", + elapsed + ); + } + info!("Stopped consuming messages for sink connector with ID: {plugin_id}"); + Ok(()) +} + +fn init_sink(container: &Container<SinkApi>, config: &serde_json::Value, id: u32) { + let config = serde_json::to_string(config).expect("Invalid sink config."); + (container.open)(id, config.as_ptr(), config.len()); +} + +async fn process_messages( + plugin_id: u32, + messages_metadata: MessagesMetadata, + topic_metadata: &TopicMetadata, + messages: Vec<IggyMessage>, + consume: &ConsumeCallback, + transforms: &Vec<Arc<dyn Transform>>, + decoder: &Arc<dyn StreamDecoder>, +) -> Result<(), RuntimeError> { + let messages = messages.into_iter().map(|message| ReceivedMessage { + id: message.header.id, + offset: message.header.offset, + headers: message.user_headers_map().unwrap_or_default(), + payload: message.payload.into(), + }); + Review Comment: Calling `.len()` on the iterator `messages` is invalid because `Iterator` doesn’t implement `len()`. Consider collecting into a Vec first (e.g., `let msgs: Vec<_> = messages.collect(); let count = msgs.len();`). ```suggestion let messages: Vec<_> = messages.collect(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
