Copilot commented on code in PR #2557: URL: https://github.com/apache/iggy/pull/2557#discussion_r2693215812
########## core/connectors/sinks/redshift_sink/src/config.rs: ########## @@ -0,0 +1,243 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy_connector_sdk::Error; +use serde::{Deserialize, Serialize}; + +/// Configuration for the Redshift Sink Connector. +/// +/// This connector loads data from Iggy streams into Amazon Redshift using S3 staging, +/// which is the recommended approach for high-volume data loading. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RedshiftSinkConfig { + /// Redshift connection string in PostgreSQL format. + /// Example: `postgres://user:[email protected]:5439/database` + pub connection_string: String, + + /// Target table name in Redshift. Can include schema prefix. + /// Example: `public.events` or `analytics.user_actions` + pub target_table: String, + + /// IAM role ARN for Redshift to access S3. Preferred over access keys. + /// Example: `arn:aws:iam::123456789012:role/RedshiftS3Access` + pub iam_role: Option<String>, + + /// S3 bucket name for staging CSV files before COPY. + pub s3_bucket: String, + + /// S3 key prefix for staged files. Defaults to empty string. + /// Example: `staging/redshift/` + pub s3_prefix: Option<String>, + + /// AWS region for the S3 bucket. + /// Example: `us-east-1` + pub s3_region: String, + + /// Custom S3 endpoint URL for testing with LocalStack or MinIO. + /// If not specified, uses the default AWS S3 endpoint. + /// Example: `http://localhost:4566` + pub s3_endpoint: Option<String>, + + /// AWS access key ID. Required if IAM role is not specified. + pub aws_access_key_id: Option<String>, + + /// AWS secret access key. Required if IAM role is not specified. + pub aws_secret_access_key: Option<String>, + + /// Number of messages to batch before uploading to S3 and executing COPY. + /// Defaults to 10000. + pub batch_size: Option<u32>, + + /// Maximum time in milliseconds to wait before flushing a partial batch. + /// Defaults to 30000 (30 seconds). + pub flush_interval_ms: Option<u64>, + + /// CSV field delimiter character. Defaults to `,`. + pub csv_delimiter: Option<char>, + + /// CSV quote character for escaping. Defaults to `"`. + pub csv_quote: Option<char>, + + /// Number of header rows to skip. Defaults to 0. + pub ignore_header: Option<u32>, + + /// Maximum number of errors allowed before COPY fails. Defaults to 0. + pub max_errors: Option<u32>, + + /// Compression format for staged files: `gzip`, `lzop`, `bzip2`, or `none`. + pub compression: Option<String>, + + /// Whether to delete staged S3 files after successful COPY. Defaults to true. + pub delete_staged_files: Option<bool>, + + /// Maximum number of database connections. Defaults to 5. + pub max_connections: Option<u32>, + + /// Maximum number of retry attempts for transient failures. Defaults to 3. + pub max_retries: Option<u32>, + + /// Initial delay in milliseconds between retries. Uses exponential backoff. + /// Defaults to 1000. + pub retry_delay_ms: Option<u64>, + + /// Whether to include Iggy metadata columns (offset, timestamp, stream, topic, partition). + /// Defaults to false. + pub include_metadata: Option<bool>, + + /// Whether to auto-create the target table if it doesn't exist. Defaults to false. + pub auto_create_table: Option<bool>, +} + +impl RedshiftSinkConfig { + /// Validates the configuration and returns an error if invalid. + pub fn validate(&self) -> Result<(), Error> { + if self.connection_string.is_empty() { + return Err(Error::InvalidConfig); + } + + if self.target_table.is_empty() { + return Err(Error::InvalidConfig); + } + + if self.s3_bucket.is_empty() { + return Err(Error::InvalidConfig); + } + + if self.s3_region.is_empty() { + return Err(Error::InvalidConfig); + } + + // Validate AWS credentials: either IAM role or access keys must be provided + let has_iam_role = self.iam_role.as_ref().is_some_and(|r| !r.is_empty()); + let has_access_key = self + .aws_access_key_id + .as_ref() + .is_some_and(|k| !k.is_empty()); + let has_secret_key = self + .aws_secret_access_key + .as_ref() + .is_some_and(|s| !s.is_empty()); + + if !(has_iam_role || (has_access_key && has_secret_key)) { + return Err(Error::InvalidConfig); + } + + // If using access keys, both must be provided + if (has_access_key && !has_secret_key) || (!has_access_key && has_secret_key) { + return Err(Error::InvalidConfig); + } + + // Validate compression if specified + if let Some(compression) = &self.compression { + let valid = ["gzip", "lzop", "bzip2", "none", "zstd"]; Review Comment: Redshift COPY command does not support 'zstd' compression format. According to AWS documentation, Redshift COPY only supports GZIP, LZOP, BZIP2, and ZSTD is not listed as a valid compression option. This will cause runtime errors when users specify 'zstd' compression. Remove 'zstd' from the valid compression formats list. ```suggestion let valid = ["gzip", "lzop", "bzip2", "none"]; ``` ########## core/connectors/sinks/redshift_sink/src/lib.rs: ########## @@ -0,0 +1,484 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod config; +mod s3; + +use async_trait::async_trait; +use config::RedshiftSinkConfig; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, +}; +use s3::S3Uploader; +use sqlx::{Pool, Postgres, postgres::PgPoolOptions}; +use std::time::Duration; +use tokio::sync::Mutex; +use tracing::{error, info, warn}; + +sink_connector!(RedshiftSink); + +#[derive(Debug)] +pub struct RedshiftSink { + id: u32, + config: RedshiftSinkConfig, + pool: Option<Pool<Postgres>>, + s3_uploader: Option<S3Uploader>, + state: Mutex<SinkState>, +} + +#[derive(Debug, Default)] +struct SinkState { + messages_processed: u64, + batches_loaded: u64, + load_errors: u64, +} + +impl RedshiftSink { + pub fn new(id: u32, config: RedshiftSinkConfig) -> Self { + RedshiftSink { + id, + config, + pool: None, + s3_uploader: None, + state: Mutex::new(SinkState::default()), + } + } + + async fn connect_redshift(&mut self) -> Result<(), Error> { + let max_connections = self.config.max_connections.unwrap_or(5); + let redacted = self + .config + .connection_string + .chars() + .take(20) + .collect::<String>(); + + info!( + "Connecting to Redshift with max {} connections, connection: {}...", + max_connections, redacted + ); + + let pool = PgPoolOptions::new() + .max_connections(max_connections) + .acquire_timeout(Duration::from_secs(30)) + .connect(&self.config.connection_string) + .await + .map_err(|e| Error::InitError(format!("Failed to connect to Redshift: {e}")))?; + + sqlx::query("SELECT 1") + .execute(&pool) + .await + .map_err(|e| Error::InitError(format!("Redshift connectivity test failed: {e}")))?; + + self.pool = Some(pool); + info!("Connected to Redshift cluster"); + Ok(()) + } + + fn init_s3_uploader(&mut self) -> Result<(), Error> { + let uploader = S3Uploader::new( + &self.config.s3_bucket, + self.config.s3_prefix.as_deref().unwrap_or(""), + &self.config.s3_region, + self.config.aws_access_key_id.as_deref(), + self.config.aws_secret_access_key.as_deref(), + self.config.s3_endpoint.as_deref(), + )?; + self.s3_uploader = Some(uploader); + info!( + "Initialized S3 uploader for bucket: {}, region: {}{}", + self.config.s3_bucket, + self.config.s3_region, + self.config + .s3_endpoint + .as_ref() + .map_or(String::new(), |e| format!(", endpoint: {}", e)) + ); + Ok(()) + } + + async fn ensure_table_exists(&self) -> Result<(), Error> { + if !self.config.auto_create_table.unwrap_or(false) { + return Ok(()); + } + + let pool = self + .pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string()))?; + + let table_name = &self.config.target_table; + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let mut sql = format!( + "CREATE TABLE IF NOT EXISTS {table_name} ( + id VARCHAR(40) PRIMARY KEY, + payload VARCHAR(MAX)" + ); + + if include_metadata { + sql.push_str( + ", + iggy_offset BIGINT, + iggy_timestamp TIMESTAMP, + iggy_stream VARCHAR(256), + iggy_topic VARCHAR(256), + iggy_partition_id INTEGER", + ); + } + + sql.push_str( + ", + created_at TIMESTAMP DEFAULT GETDATE() + )", + ); + + sqlx::query(&sql) + .execute(pool) + .await + .map_err(|e| Error::InitError(format!("Failed to create table '{table_name}': {e}")))?; + + info!("Ensured table '{}' exists in Redshift", table_name); + Ok(()) + } + + async fn process_batch( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result<(), Error> { + if messages.is_empty() { + return Ok(()); + } + + let s3_uploader = self + .s3_uploader + .as_ref() + .ok_or_else(|| Error::InitError("S3 uploader not initialized".to_string()))?; + + let pool = self + .pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string()))?; + + // Convert messages to CSV + let csv_data = self.messages_to_csv(topic_metadata, messages_metadata, messages)?; + + // Upload to S3 + let s3_key = s3_uploader.upload_csv(&csv_data).await?; + let s3_path = format!("s3://{}/{}", self.config.s3_bucket, s3_key); + + info!( + "Uploaded {} messages ({} bytes) to {}", + messages.len(), + csv_data.len(), + s3_path + ); + + // Execute COPY command + let copy_result = self.execute_copy(pool, &s3_path).await; + + // Cleanup S3 file if configured + if self.config.delete_staged_files.unwrap_or(true) + && let Err(e) = s3_uploader.delete_file(&s3_key).await + { + warn!("Failed to delete staged file {}: {}", s3_key, e); + } + + copy_result?; Review Comment: The cleanup logic has a critical flaw. If execute_copy fails, the function returns early at line 204 (copy_result?) without attempting to delete the staged S3 file. This will leave orphaned files in S3 when COPY operations fail. The cleanup should occur regardless of the COPY command result, or at minimum should be attempted even on failure to avoid accumulating staging files. ########## core/connectors/sinks/redshift_sink/src/lib.rs: ########## @@ -0,0 +1,484 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod config; +mod s3; + +use async_trait::async_trait; +use config::RedshiftSinkConfig; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, +}; +use s3::S3Uploader; +use sqlx::{Pool, Postgres, postgres::PgPoolOptions}; +use std::time::Duration; +use tokio::sync::Mutex; +use tracing::{error, info, warn}; + +sink_connector!(RedshiftSink); + +#[derive(Debug)] +pub struct RedshiftSink { + id: u32, + config: RedshiftSinkConfig, + pool: Option<Pool<Postgres>>, + s3_uploader: Option<S3Uploader>, + state: Mutex<SinkState>, +} + +#[derive(Debug, Default)] +struct SinkState { + messages_processed: u64, + batches_loaded: u64, + load_errors: u64, +} + +impl RedshiftSink { + pub fn new(id: u32, config: RedshiftSinkConfig) -> Self { + RedshiftSink { + id, + config, + pool: None, + s3_uploader: None, + state: Mutex::new(SinkState::default()), + } + } + + async fn connect_redshift(&mut self) -> Result<(), Error> { + let max_connections = self.config.max_connections.unwrap_or(5); + let redacted = self + .config + .connection_string + .chars() + .take(20) + .collect::<String>(); + + info!( + "Connecting to Redshift with max {} connections, connection: {}...", + max_connections, redacted + ); + + let pool = PgPoolOptions::new() + .max_connections(max_connections) + .acquire_timeout(Duration::from_secs(30)) + .connect(&self.config.connection_string) + .await + .map_err(|e| Error::InitError(format!("Failed to connect to Redshift: {e}")))?; + + sqlx::query("SELECT 1") + .execute(&pool) + .await + .map_err(|e| Error::InitError(format!("Redshift connectivity test failed: {e}")))?; + + self.pool = Some(pool); + info!("Connected to Redshift cluster"); + Ok(()) + } + + fn init_s3_uploader(&mut self) -> Result<(), Error> { + let uploader = S3Uploader::new( + &self.config.s3_bucket, + self.config.s3_prefix.as_deref().unwrap_or(""), + &self.config.s3_region, + self.config.aws_access_key_id.as_deref(), + self.config.aws_secret_access_key.as_deref(), + self.config.s3_endpoint.as_deref(), + )?; + self.s3_uploader = Some(uploader); + info!( + "Initialized S3 uploader for bucket: {}, region: {}{}", + self.config.s3_bucket, + self.config.s3_region, + self.config + .s3_endpoint + .as_ref() + .map_or(String::new(), |e| format!(", endpoint: {}", e)) + ); + Ok(()) + } + + async fn ensure_table_exists(&self) -> Result<(), Error> { + if !self.config.auto_create_table.unwrap_or(false) { + return Ok(()); + } + + let pool = self + .pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string()))?; + + let table_name = &self.config.target_table; + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let mut sql = format!( + "CREATE TABLE IF NOT EXISTS {table_name} ( + id VARCHAR(40) PRIMARY KEY, + payload VARCHAR(MAX)" + ); + + if include_metadata { + sql.push_str( + ", + iggy_offset BIGINT, + iggy_timestamp TIMESTAMP, + iggy_stream VARCHAR(256), + iggy_topic VARCHAR(256), + iggy_partition_id INTEGER", + ); + } + + sql.push_str( + ", + created_at TIMESTAMP DEFAULT GETDATE() + )", + ); + + sqlx::query(&sql) + .execute(pool) + .await + .map_err(|e| Error::InitError(format!("Failed to create table '{table_name}': {e}")))?; + + info!("Ensured table '{}' exists in Redshift", table_name); + Ok(()) + } + + async fn process_batch( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result<(), Error> { + if messages.is_empty() { + return Ok(()); + } + + let s3_uploader = self + .s3_uploader + .as_ref() + .ok_or_else(|| Error::InitError("S3 uploader not initialized".to_string()))?; + + let pool = self + .pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string()))?; + + // Convert messages to CSV + let csv_data = self.messages_to_csv(topic_metadata, messages_metadata, messages)?; + + // Upload to S3 + let s3_key = s3_uploader.upload_csv(&csv_data).await?; + let s3_path = format!("s3://{}/{}", self.config.s3_bucket, s3_key); + + info!( + "Uploaded {} messages ({} bytes) to {}", + messages.len(), + csv_data.len(), + s3_path + ); + + // Execute COPY command + let copy_result = self.execute_copy(pool, &s3_path).await; + + // Cleanup S3 file if configured + if self.config.delete_staged_files.unwrap_or(true) + && let Err(e) = s3_uploader.delete_file(&s3_key).await + { + warn!("Failed to delete staged file {}: {}", s3_key, e); + } + + copy_result?; + + let mut state = self.state.lock().await; + state.messages_processed += messages.len() as u64; + state.batches_loaded += 1; + + info!( + "Redshift sink ID: {} loaded {} messages to table '{}' (total: {}, batches: {})", + self.id, + messages.len(), + self.config.target_table, + state.messages_processed, + state.batches_loaded + ); + + Ok(()) + } + + fn messages_to_csv( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result<Vec<u8>, Error> { + let delimiter = self.config.csv_delimiter.unwrap_or(','); + let quote = self.config.csv_quote.unwrap_or('"'); + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let mut csv_output = Vec::new(); + + for message in messages { + let payload_str = match &message.payload { + Payload::Json(value) => simd_json::to_string(value).unwrap_or_default(), + Payload::Text(text) => text.clone(), + Payload::Raw(bytes) => String::from_utf8_lossy(bytes).to_string(), + _ => { + let bytes = message.payload.clone().try_into_vec().map_err(|e| { + error!("Failed to convert payload: {}", e); + Error::InvalidRecord + })?; + String::from_utf8_lossy(&bytes).to_string() + } + }; + + // Escape quotes in payload + let escaped_payload = payload_str.replace(quote, &format!("{quote}{quote}")); Review Comment: Creating a new String via format! macro for every message is inefficient. Since quote is always a single character, this can be optimized to use a string literal or a pre-allocated string. Consider using a match statement or caching the escaped quote string. ########## core/connectors/sinks/redshift_sink/src/lib.rs: ########## @@ -0,0 +1,484 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod config; +mod s3; + +use async_trait::async_trait; +use config::RedshiftSinkConfig; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, +}; +use s3::S3Uploader; +use sqlx::{Pool, Postgres, postgres::PgPoolOptions}; +use std::time::Duration; +use tokio::sync::Mutex; +use tracing::{error, info, warn}; + +sink_connector!(RedshiftSink); + +#[derive(Debug)] +pub struct RedshiftSink { + id: u32, + config: RedshiftSinkConfig, + pool: Option<Pool<Postgres>>, + s3_uploader: Option<S3Uploader>, + state: Mutex<SinkState>, +} + +#[derive(Debug, Default)] +struct SinkState { + messages_processed: u64, + batches_loaded: u64, + load_errors: u64, +} + +impl RedshiftSink { + pub fn new(id: u32, config: RedshiftSinkConfig) -> Self { + RedshiftSink { + id, + config, + pool: None, + s3_uploader: None, + state: Mutex::new(SinkState::default()), + } + } + + async fn connect_redshift(&mut self) -> Result<(), Error> { + let max_connections = self.config.max_connections.unwrap_or(5); + let redacted = self + .config + .connection_string + .chars() + .take(20) + .collect::<String>(); + + info!( + "Connecting to Redshift with max {} connections, connection: {}...", + max_connections, redacted + ); + + let pool = PgPoolOptions::new() + .max_connections(max_connections) + .acquire_timeout(Duration::from_secs(30)) + .connect(&self.config.connection_string) + .await + .map_err(|e| Error::InitError(format!("Failed to connect to Redshift: {e}")))?; + + sqlx::query("SELECT 1") + .execute(&pool) + .await + .map_err(|e| Error::InitError(format!("Redshift connectivity test failed: {e}")))?; + + self.pool = Some(pool); + info!("Connected to Redshift cluster"); + Ok(()) + } + + fn init_s3_uploader(&mut self) -> Result<(), Error> { + let uploader = S3Uploader::new( + &self.config.s3_bucket, + self.config.s3_prefix.as_deref().unwrap_or(""), + &self.config.s3_region, + self.config.aws_access_key_id.as_deref(), + self.config.aws_secret_access_key.as_deref(), + self.config.s3_endpoint.as_deref(), + )?; + self.s3_uploader = Some(uploader); + info!( + "Initialized S3 uploader for bucket: {}, region: {}{}", + self.config.s3_bucket, + self.config.s3_region, + self.config + .s3_endpoint + .as_ref() + .map_or(String::new(), |e| format!(", endpoint: {}", e)) + ); + Ok(()) + } + + async fn ensure_table_exists(&self) -> Result<(), Error> { + if !self.config.auto_create_table.unwrap_or(false) { + return Ok(()); + } + + let pool = self + .pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string()))?; + + let table_name = &self.config.target_table; + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let mut sql = format!( + "CREATE TABLE IF NOT EXISTS {table_name} ( + id VARCHAR(40) PRIMARY KEY, + payload VARCHAR(MAX)" + ); + + if include_metadata { + sql.push_str( + ", + iggy_offset BIGINT, + iggy_timestamp TIMESTAMP, + iggy_stream VARCHAR(256), + iggy_topic VARCHAR(256), + iggy_partition_id INTEGER", + ); + } + + sql.push_str( + ", + created_at TIMESTAMP DEFAULT GETDATE() + )", + ); + + sqlx::query(&sql) + .execute(pool) + .await + .map_err(|e| Error::InitError(format!("Failed to create table '{table_name}': {e}")))?; + + info!("Ensured table '{}' exists in Redshift", table_name); + Ok(()) + } + + async fn process_batch( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result<(), Error> { + if messages.is_empty() { + return Ok(()); + } + + let s3_uploader = self + .s3_uploader + .as_ref() + .ok_or_else(|| Error::InitError("S3 uploader not initialized".to_string()))?; + + let pool = self + .pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string()))?; + + // Convert messages to CSV + let csv_data = self.messages_to_csv(topic_metadata, messages_metadata, messages)?; + + // Upload to S3 + let s3_key = s3_uploader.upload_csv(&csv_data).await?; + let s3_path = format!("s3://{}/{}", self.config.s3_bucket, s3_key); + + info!( + "Uploaded {} messages ({} bytes) to {}", + messages.len(), + csv_data.len(), + s3_path + ); + + // Execute COPY command + let copy_result = self.execute_copy(pool, &s3_path).await; + + // Cleanup S3 file if configured + if self.config.delete_staged_files.unwrap_or(true) + && let Err(e) = s3_uploader.delete_file(&s3_key).await + { + warn!("Failed to delete staged file {}: {}", s3_key, e); + } + + copy_result?; + + let mut state = self.state.lock().await; + state.messages_processed += messages.len() as u64; + state.batches_loaded += 1; + + info!( + "Redshift sink ID: {} loaded {} messages to table '{}' (total: {}, batches: {})", + self.id, + messages.len(), + self.config.target_table, + state.messages_processed, + state.batches_loaded + ); + + Ok(()) + } + + fn messages_to_csv( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result<Vec<u8>, Error> { + let delimiter = self.config.csv_delimiter.unwrap_or(','); + let quote = self.config.csv_quote.unwrap_or('"'); + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let mut csv_output = Vec::new(); + + for message in messages { + let payload_str = match &message.payload { + Payload::Json(value) => simd_json::to_string(value).unwrap_or_default(), + Payload::Text(text) => text.clone(), + Payload::Raw(bytes) => String::from_utf8_lossy(bytes).to_string(), + _ => { + let bytes = message.payload.clone().try_into_vec().map_err(|e| { + error!("Failed to convert payload: {}", e); + Error::InvalidRecord + })?; + String::from_utf8_lossy(&bytes).to_string() + } + }; + + // Escape quotes in payload + let escaped_payload = payload_str.replace(quote, &format!("{quote}{quote}")); + + let mut row = format!( + "{}{delim}{quote}{payload}{quote}", + message.id, + delim = delimiter, + payload = escaped_payload + ); + + if include_metadata { + let timestamp_secs = message.timestamp / 1_000_000; + let timestamp = chrono::DateTime::from_timestamp(timestamp_secs as i64, 0) + .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string()) Review Comment: The timestamp conversion assumes microseconds (dividing by 1_000_000), but Iggy message timestamps are typically in microseconds and this conversion to seconds loses precision. This should preserve microseconds or the comment should clarify the expected timestamp unit. Additionally, if the timestamp is actually in microseconds, the division should account for proper conversion to match the SQL TIMESTAMP type expectations. ```suggestion // `message.timestamp` is in microseconds. Preserve microsecond precision // by converting to seconds and nanoseconds for `from_timestamp`. let timestamp_micros = message.timestamp; let timestamp_secs = (timestamp_micros / 1_000_000) as i64; let timestamp_nanos = ((timestamp_micros % 1_000_000) as u32) * 1_000; let timestamp = chrono::DateTime::from_timestamp(timestamp_secs, timestamp_nanos) .map(|dt| dt.format("%Y-%m-%d %H:%M:%S%.6f").to_string()) ``` ########## core/integration/tests/connectors/redshift/mod.rs: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::connectors::{ConnectorsRuntime, IggySetup, setup_runtime}; +use std::collections::HashMap; +use testcontainers_modules::{ + localstack::LocalStack, + postgres, + testcontainers::{ContainerAsync, runners::AsyncRunner}, +}; + +mod redshift_sink; + +/// Holds the test containers to keep them alive during tests. +struct RedshiftTestContainers { + _postgres: ContainerAsync<postgres::Postgres>, + _localstack: ContainerAsync<LocalStack>, +} + +/// Setup result containing both runtime and containers. +struct RedshiftTestSetup { + runtime: ConnectorsRuntime, + _containers: RedshiftTestContainers, +} + +async fn setup() -> RedshiftTestSetup { + // Start PostgreSQL container (simulating Redshift as they share the same wire protocol) + let postgres_container = postgres::Postgres::default() + .start() + .await + .expect("Failed to start Postgres (Redshift simulator)"); + let postgres_port = postgres_container + .get_host_port_ipv4(5432) + .await + .expect("Failed to get Postgres port"); + + // Start LocalStack for S3 + let localstack_container = LocalStack::default() + .start() + .await + .expect("Failed to start LocalStack"); + let localstack_port = localstack_container + .get_host_port_ipv4(4566) + .await + .expect("Failed to get LocalStack port"); + + // Create S3 bucket using LocalStack S3 API + let s3_endpoint = format!("http://localhost:{localstack_port}"); + let bucket_name = "iggy-redshift-staging"; + + // Create the bucket via LocalStack S3 API using path-style URL + let client = reqwest::Client::new(); + let create_bucket_url = format!("{s3_endpoint}/{bucket_name}"); + let _ = client.put(&create_bucket_url).send().await; Review Comment: The bucket creation result is silently ignored with `let _ =`. If bucket creation fails, subsequent tests will fail in confusing ways. The error should be handled explicitly with `.expect()` to provide clear test failure messages, similar to how other container setup failures are handled in this function. ```suggestion client .put(&create_bucket_url) .send() .await .expect("Failed to create S3 bucket in LocalStack"); ``` ########## core/connectors/sinks/redshift_sink/README.md: ########## @@ -0,0 +1,235 @@ +# Apache Iggy - Redshift Sink Connector + +A sink connector that loads data from Iggy streams into Amazon Redshift using the S3 staging method. This is the recommended approach for high-volume data loading into Redshift. + +## Overview + +The Redshift Sink Connector: + +1. **Buffers** incoming messages into batches +2. **Uploads** batches as CSV files to S3 +3. **Executes** Redshift COPY command to load data from S3 +4. **Cleans up** staged S3 files after successful load + +This approach leverages Redshift's massively parallel processing (MPP) architecture for efficient bulk loading. + +## Prerequisites + +- Amazon Redshift cluster with network access +- S3 bucket for staging files +- AWS credentials with appropriate permissions: + - S3: `s3:PutObject`, `s3:GetObject`, `s3:DeleteObject` on the staging bucket + - Redshift: `COPY` permission on the target table + +## Configuration + +Create a connector configuration file (e.g., `redshift.toml`): + +```toml +type = "sink" +key = "redshift" +enabled = true +version = 0 +name = "Redshift Sink" +path = "target/release/libiggy_connector_redshift_sink" +plugin_config_format = "toml" + +[[streams]] +stream = "events" +topics = ["user_actions"] +schema = "json" +batch_length = 10000 +poll_interval = "100ms" +consumer_group = "redshift_sink" + +[plugin_config] +# Redshift connection (PostgreSQL wire protocol) +connection_string = "postgres://admin:[email protected]:5439/mydb" +target_table = "public.events" + +# S3 staging configuration +s3_bucket = "my-staging-bucket" +s3_prefix = "redshift/staging/" +s3_region = "us-east-1" + +# AWS authentication - use either IAM role (preferred) or access keys +iam_role = "arn:aws:iam::123456789012:role/RedshiftS3Access" + +# Or use access keys instead of IAM role: +# aws_access_key_id = "AKIAIOSFODNN7EXAMPLE" +# aws_secret_access_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + +# Batching settings +batch_size = 10000 +flush_interval_ms = 30000 + +# CSV format options +csv_delimiter = "," +csv_quote = "\"" + +# COPY command options +max_errors = 10 +# compression = "gzip" + +# Cleanup and reliability +delete_staged_files = true +max_retries = 3 +retry_delay_ms = 1000 + +# Database settings +max_connections = 5 +auto_create_table = false + +# Metadata columns (adds iggy_offset, iggy_timestamp, etc.) +include_metadata = false +``` + +## Configuration Reference + +| Property | Type | Required | Default | Description | +| -------- | ---- | -------- | ------- | ----------- | +| `connection_string` | String | Yes | - | Redshift connection string in PostgreSQL format | +| `target_table` | String | Yes | - | Target table name (can include schema) | +| `s3_bucket` | String | Yes | - | S3 bucket for staging CSV files | +| `s3_region` | String | Yes | - | AWS region for the S3 bucket | +| `s3_prefix` | String | No | `""` | S3 key prefix for staged files | +| `iam_role` | String | No* | - | IAM role ARN for Redshift to access S3 | +| `aws_access_key_id` | String | No* | - | AWS access key ID | +| `aws_secret_access_key` | String | No* | - | AWS secret access key | +| `batch_size` | Integer | No | `10000` | Messages per batch before S3 upload | +| `flush_interval_ms` | Integer | No | `30000` | Max wait time before flushing partial batch | +| `csv_delimiter` | Char | No | `,` | CSV field delimiter | +| `csv_quote` | Char | No | `"` | CSV quote character | +| `max_errors` | Integer | No | `0` | Max errors before COPY fails | +| `compression` | String | No | `none` | Compression: `gzip`, `lzop`, `bzip2`, `zstd` | Review Comment: The documentation lists 'zstd' as a supported compression format, but Redshift COPY does not support ZSTD compression. This is inconsistent with AWS Redshift documentation and will mislead users. Remove 'zstd' from the supported compression formats list. ```suggestion | `compression` | String | No | `none` | Compression: `gzip`, `lzop`, `bzip2` | ``` ########## core/connectors/sinks/redshift_sink/src/lib.rs: ########## @@ -0,0 +1,484 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod config; +mod s3; + +use async_trait::async_trait; +use config::RedshiftSinkConfig; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, +}; +use s3::S3Uploader; +use sqlx::{Pool, Postgres, postgres::PgPoolOptions}; +use std::time::Duration; +use tokio::sync::Mutex; +use tracing::{error, info, warn}; + +sink_connector!(RedshiftSink); + +#[derive(Debug)] +pub struct RedshiftSink { + id: u32, + config: RedshiftSinkConfig, + pool: Option<Pool<Postgres>>, + s3_uploader: Option<S3Uploader>, + state: Mutex<SinkState>, +} + +#[derive(Debug, Default)] +struct SinkState { + messages_processed: u64, + batches_loaded: u64, + load_errors: u64, +} + +impl RedshiftSink { + pub fn new(id: u32, config: RedshiftSinkConfig) -> Self { + RedshiftSink { + id, + config, + pool: None, + s3_uploader: None, + state: Mutex::new(SinkState::default()), + } + } + + async fn connect_redshift(&mut self) -> Result<(), Error> { + let max_connections = self.config.max_connections.unwrap_or(5); + let redacted = self + .config + .connection_string + .chars() + .take(20) + .collect::<String>(); + + info!( + "Connecting to Redshift with max {} connections, connection: {}...", + max_connections, redacted + ); + + let pool = PgPoolOptions::new() + .max_connections(max_connections) + .acquire_timeout(Duration::from_secs(30)) + .connect(&self.config.connection_string) + .await + .map_err(|e| Error::InitError(format!("Failed to connect to Redshift: {e}")))?; + + sqlx::query("SELECT 1") + .execute(&pool) + .await + .map_err(|e| Error::InitError(format!("Redshift connectivity test failed: {e}")))?; + + self.pool = Some(pool); + info!("Connected to Redshift cluster"); + Ok(()) + } + + fn init_s3_uploader(&mut self) -> Result<(), Error> { + let uploader = S3Uploader::new( + &self.config.s3_bucket, + self.config.s3_prefix.as_deref().unwrap_or(""), + &self.config.s3_region, + self.config.aws_access_key_id.as_deref(), + self.config.aws_secret_access_key.as_deref(), + self.config.s3_endpoint.as_deref(), + )?; + self.s3_uploader = Some(uploader); + info!( + "Initialized S3 uploader for bucket: {}, region: {}{}", + self.config.s3_bucket, + self.config.s3_region, + self.config + .s3_endpoint + .as_ref() + .map_or(String::new(), |e| format!(", endpoint: {}", e)) + ); + Ok(()) + } + + async fn ensure_table_exists(&self) -> Result<(), Error> { + if !self.config.auto_create_table.unwrap_or(false) { + return Ok(()); + } + + let pool = self + .pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string()))?; + + let table_name = &self.config.target_table; + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let mut sql = format!( + "CREATE TABLE IF NOT EXISTS {table_name} ( + id VARCHAR(40) PRIMARY KEY, + payload VARCHAR(MAX)" + ); + + if include_metadata { + sql.push_str( + ", + iggy_offset BIGINT, + iggy_timestamp TIMESTAMP, + iggy_stream VARCHAR(256), + iggy_topic VARCHAR(256), + iggy_partition_id INTEGER", + ); + } + + sql.push_str( + ", + created_at TIMESTAMP DEFAULT GETDATE() + )", + ); + + sqlx::query(&sql) + .execute(pool) + .await + .map_err(|e| Error::InitError(format!("Failed to create table '{table_name}': {e}")))?; + + info!("Ensured table '{}' exists in Redshift", table_name); + Ok(()) + } + + async fn process_batch( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result<(), Error> { + if messages.is_empty() { + return Ok(()); + } + + let s3_uploader = self + .s3_uploader + .as_ref() + .ok_or_else(|| Error::InitError("S3 uploader not initialized".to_string()))?; + + let pool = self + .pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string()))?; + + // Convert messages to CSV + let csv_data = self.messages_to_csv(topic_metadata, messages_metadata, messages)?; + + // Upload to S3 + let s3_key = s3_uploader.upload_csv(&csv_data).await?; + let s3_path = format!("s3://{}/{}", self.config.s3_bucket, s3_key); + + info!( + "Uploaded {} messages ({} bytes) to {}", + messages.len(), + csv_data.len(), + s3_path + ); + + // Execute COPY command + let copy_result = self.execute_copy(pool, &s3_path).await; + + // Cleanup S3 file if configured + if self.config.delete_staged_files.unwrap_or(true) + && let Err(e) = s3_uploader.delete_file(&s3_key).await + { + warn!("Failed to delete staged file {}: {}", s3_key, e); + } + + copy_result?; + + let mut state = self.state.lock().await; + state.messages_processed += messages.len() as u64; + state.batches_loaded += 1; + + info!( + "Redshift sink ID: {} loaded {} messages to table '{}' (total: {}, batches: {})", + self.id, + messages.len(), + self.config.target_table, + state.messages_processed, + state.batches_loaded + ); + + Ok(()) + } + + fn messages_to_csv( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result<Vec<u8>, Error> { + let delimiter = self.config.csv_delimiter.unwrap_or(','); + let quote = self.config.csv_quote.unwrap_or('"'); + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let mut csv_output = Vec::new(); + + for message in messages { + let payload_str = match &message.payload { + Payload::Json(value) => simd_json::to_string(value).unwrap_or_default(), Review Comment: Using unwrap_or_default() silently converts JSON serialization errors to empty strings. This could result in data loss without any indication to the user. Consider logging the error or returning an error result to properly handle serialization failures. ```suggestion Payload::Json(value) => simd_json::to_string(value).map_err(|e| { error!("Failed to serialize JSON payload: {}", e); Error::InvalidRecord })?, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
