This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch postres_connector
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/postres_connector by this push:
     new 2a18d33be Fix tests and postgres logic, add verbose flag
2a18d33be is described below

commit 2a18d33be1d1a6d4ecbdb71b5284261beb062c45
Author: spetz <[email protected]>
AuthorDate: Mon Jan 19 10:29:28 2026 +0100

    Fix tests and postgres logic, add verbose flag
---
 core/connectors/runtime/src/configs/connectors.rs  |  10 +
 .../src/configs/connectors/local_provider.rs       |  32 +-
 core/connectors/runtime/src/main.rs                |   2 +
 core/connectors/runtime/src/sink.rs                |  36 +-
 core/connectors/runtime/src/source.rs              |  26 +-
 core/connectors/sdk/src/lib.rs                     |   2 +
 core/connectors/sinks/README.md                    |   2 +
 .../sinks/postgres_sink/config.toml}               |  12 +-
 core/connectors/sinks/postgres_sink/src/lib.rs     | 364 +++++++++++++--------
 core/connectors/sources/README.md                  |   2 +
 core/connectors/sources/postgres_source/README.md  |   8 +-
 .../sources/postgres_source/config.toml}           |  17 +-
 core/integration/tests/connectors/mod.rs           |  12 +-
 core/integration/tests/connectors/postgres/mod.rs  | 187 +++++++----
 .../tests/connectors/postgres/sink.toml            |   2 +-
 .../tests/connectors/postgres/sink_json.toml       |  20 --
 .../connectors/postgres/sink_json_config/sink.toml |  42 ---
 .../tests/connectors/postgres/sink_raw.toml        |  20 --
 .../connectors/postgres/sink_raw_config/sink.toml  |  42 ---
 .../tests/connectors/postgres/source.toml          |   2 +-
 .../tests/connectors/postgres/source_bytea.toml    |  20 --
 .../postgres/source_bytea_config/source.toml       |  40 ---
 .../tests/connectors/postgres/source_delete.toml   |  20 --
 .../postgres/source_delete_config/source.toml      |  41 ---
 .../tests/connectors/postgres/source_json.toml     |  20 --
 .../postgres/source_json_config/source.toml        |  40 ---
 .../tests/connectors/postgres/source_mark.toml     |  20 --
 .../postgres/source_mark_config/source.toml        |  41 ---
 28 files changed, 466 insertions(+), 616 deletions(-)

diff --git a/core/connectors/runtime/src/configs/connectors.rs 
b/core/connectors/runtime/src/configs/connectors.rs
index 6bacdc7e4..14dd0bdf8 100644
--- a/core/connectors/runtime/src/configs/connectors.rs
+++ b/core/connectors/runtime/src/configs/connectors.rs
@@ -80,6 +80,8 @@ pub struct CreateSinkConfig {
     pub streams: Vec<StreamConsumerConfig>,
     pub plugin_config_format: Option<ConfigFormat>,
     pub plugin_config: Option<serde_json::Value>,
+    #[serde(default)]
+    pub verbose: bool,
 }
 
 impl CreateSinkConfig {
@@ -94,6 +96,7 @@ impl CreateSinkConfig {
             streams: self.streams.clone(),
             plugin_config_format: self.plugin_config_format,
             plugin_config: self.plugin_config.clone(),
+            verbose: self.verbose,
         }
     }
 }
@@ -109,6 +112,8 @@ pub struct SinkConfig {
     pub streams: Vec<StreamConsumerConfig>,
     pub plugin_config_format: Option<ConfigFormat>,
     pub plugin_config: Option<serde_json::Value>,
+    #[serde(default)]
+    pub verbose: bool,
 }
 
 #[derive(Debug, Default, Clone, Serialize, Deserialize)]
@@ -120,6 +125,8 @@ pub struct CreateSourceConfig {
     pub streams: Vec<StreamProducerConfig>,
     pub plugin_config_format: Option<ConfigFormat>,
     pub plugin_config: Option<serde_json::Value>,
+    #[serde(default)]
+    pub verbose: bool,
 }
 
 impl CreateSourceConfig {
@@ -134,6 +141,7 @@ impl CreateSourceConfig {
             streams: self.streams.clone(),
             plugin_config_format: self.plugin_config_format,
             plugin_config: self.plugin_config.clone(),
+            verbose: self.verbose,
         }
     }
 }
@@ -149,6 +157,8 @@ pub struct SourceConfig {
     pub streams: Vec<StreamProducerConfig>,
     pub plugin_config_format: Option<ConfigFormat>,
     pub plugin_config: Option<serde_json::Value>,
+    #[serde(default)]
+    pub verbose: bool,
 }
 
 #[derive(Debug, Clone, Default, Serialize, Deserialize)]
diff --git a/core/connectors/runtime/src/configs/connectors/local_provider.rs 
b/core/connectors/runtime/src/configs/connectors/local_provider.rs
index d3024e460..fb363d9c5 100644
--- a/core/connectors/runtime/src/configs/connectors/local_provider.rs
+++ b/core/connectors/runtime/src/configs/connectors/local_provider.rs
@@ -164,24 +164,36 @@ impl LocalConnectorsConfigProvider<Created> {
 
         let sinks: DashMap<ConnectorId, SinkConfigFile> = DashMap::new();
         let sources: DashMap<ConnectorId, SourceConfigFile> = DashMap::new();
-        info!("Loading connectors configuration from: {}", self.config_dir);
+        let cwd = match std::env::current_dir() {
+            Ok(path) => path.display().to_string(),
+            Err(_) => "unknown".to_string(),
+        };
+        info!(
+            "Loading connectors configuration from: {}, current directory: 
{cwd}",
+            self.config_dir
+        );
         let entries = std::fs::read_dir(&self.config_dir)?;
         for entry in entries.flatten() {
             let path = entry.path();
             if path.is_file() {
                 if path.extension().and_then(|ext| ext.to_str()) != 
Some("toml") {
-                    debug!("Skipping non-TOML file: {:?}", path);
+                    debug!("Skipping non-TOML file: {}", path.display());
                     continue;
                 }
 
-                if let Some(file_name) = path.file_name().and_then(|n| 
n.to_str())
-                    && file_name.starts_with('.')
-                {
-                    debug!("Skipping hidden file: {:?}", path);
-                    continue;
+                if let Some(file_name) = path.file_name().and_then(|n| 
n.to_str()) {
+                    if file_name.starts_with('.') {
+                        debug!("Skipping hidden file: {}", path.display());
+                        continue;
+                    }
+                    let file_name_lower = file_name.to_lowercase();
+                    if file_name_lower == "cargo.toml" {
+                        debug!("Skipping Cargo.toml: {}", path.display());
+                        continue;
+                    }
                 }
 
-                debug!("Loading connector configuration from: {:?}", path);
+                info!("Loading connector configuration from: {}", 
path.display());
                 let base_config = Self::read_base_config(&path)?;
                 debug!("Loaded base configuration: {:?}", base_config);
                 let path = path
@@ -229,8 +241,8 @@ impl LocalConnectorsConfigProvider<Created> {
                     }
                 }
 
-                debug!(
-                    "Loaded connector configuration with key {}, version {}, 
created at {}",
+                info!(
+                    "Loaded connector configuration with key: {}, version: {}, 
created at {}",
                     base_config.key(),
                     version,
                     created_at.to_rfc3339()
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index da0ac4214..d600fc7eb 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -276,6 +276,7 @@ struct SinkConnectorPlugin {
     config_format: Option<ConfigFormat>,
     consumers: Vec<SinkConnectorConsumer>,
     error: Option<String>,
+    verbose: bool,
 }
 
 struct SinkConnectorConsumer {
@@ -310,6 +311,7 @@ struct SourceConnectorPlugin {
     producer: Option<SourceConnectorProducer>,
     state_storage: StateStorage,
     error: Option<String>,
+    verbose: bool,
 }
 
 struct SourceConnectorProducer {
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index abd04971e..c9d8bd3da 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -40,7 +40,7 @@ use std::{
     sync::{Arc, atomic::Ordering},
     time::Instant,
 };
-use tracing::{error, info, warn};
+use tracing::{debug, error, info, warn};
 
 pub async fn init(
     sink_configs: HashMap<String, SinkConfig>,
@@ -78,6 +78,7 @@ pub async fn init(
                 config_format: config.plugin_config_format,
                 consumers: vec![],
                 error: init_error.clone(),
+                verbose: config.verbose,
             });
         } else {
             let container: Container<SinkApi> =
@@ -102,6 +103,7 @@ pub async fn init(
                         config_format: config.plugin_config_format,
                         consumers: vec![],
                         error: init_error.clone(),
+                        verbose: config.verbose,
                     }],
                 },
             );
@@ -205,6 +207,7 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context: 
Arc<RuntimeContext>) {
                         sink.callback,
                         consumer.transforms,
                         consumer.consumer,
+                        plugin.verbose,
                     )
                     .await
                     {
@@ -233,6 +236,7 @@ async fn consume_messages(
     consume: ConsumeCallback,
     transforms: Vec<Arc<dyn Transform>>,
     mut consumer: IggyConsumer,
+    verbose: bool,
 ) -> Result<(), RuntimeError> {
     info!("Started consuming messages for sink connector with ID: 
{plugin_id}");
     let batch_size = batch_size as usize;
@@ -263,10 +267,17 @@ async fn consume_messages(
             current_offset,
             schema: decoder.schema(),
         };
-        info!(
-            "Processing {messages_count} messages for sink connector with ID: 
{}",
-            plugin_id
-        );
+        if verbose {
+            info!(
+                "Processing {messages_count} messages for sink connector with 
ID: {}",
+                plugin_id
+            );
+        } else {
+            debug!(
+                "Processing {messages_count} messages for sink connector with 
ID: {}",
+                plugin_id
+            );
+        }
         let start = Instant::now();
         if let Err(error) = process_messages(
             plugin_id,
@@ -286,10 +297,17 @@ async fn consume_messages(
         }
 
         let elapsed = start.elapsed();
-        info!(
-            "Consumed {messages_count} messages in {:#?} for sink connector 
with ID: {plugin_id}",
-            elapsed
-        );
+        if verbose {
+            info!(
+                "Consumed {messages_count} messages in {:#?} for sink 
connector with ID: {plugin_id}",
+                elapsed
+            );
+        } else {
+            debug!(
+                "Consumed {messages_count} messages in {:#?} for sink 
connector with ID: {plugin_id}",
+                elapsed
+            );
+        }
     }
     info!("Stopped consuming messages for sink connector with ID: 
{plugin_id}");
     Ok(())
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index 26af0eaff..5d6d88744 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -91,6 +91,7 @@ pub async fn init(
                 transforms: vec![],
                 state_storage,
                 error: init_error.clone(),
+                verbose: config.verbose,
             });
         } else {
             let container: Container<SourceApi> =
@@ -118,6 +119,7 @@ pub async fn init(
                         transforms: vec![],
                         state_storage,
                         error: init_error.clone(),
+                        verbose: config.verbose,
                     }],
                 },
             );
@@ -267,7 +269,11 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, 
context: Arc<RuntimeContext>
 
                 while let Ok(produced_messages) = receiver.recv_async().await {
                     let count = produced_messages.messages.len();
-                    info!("Source connector with ID: {plugin_id} received 
{count} messages",);
+                    if plugin.verbose {
+                        info!("Source connector with ID: {plugin_id} received 
{count} messages",);
+                    } else {
+                        debug!("Source connector with ID: {plugin_id} received 
{count} messages",);
+                    }
                     let schema = produced_messages.schema;
                     let mut messages: Vec<DecodedMessage> = 
Vec::with_capacity(count);
                     for message in produced_messages.messages {
@@ -322,11 +328,19 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>, 
context: Arc<RuntimeContext>
                         continue;
                     }
 
-                    info!(
-                        "Sent {count} messages to stream: {}, topic: {} by 
source connector with ID: {plugin_id}",
-                        producer.stream(),
-                        producer.topic()
-                    );
+                    if plugin.verbose {
+                        info!(
+                            "Sent {count} messages to stream: {}, topic: {} by 
source connector with ID: {plugin_id}",
+                            producer.stream(),
+                            producer.topic()
+                        );
+                    } else {
+                        debug!(
+                            "Sent {count} messages to stream: {}, topic: {} by 
source connector with ID: {plugin_id}",
+                            producer.stream(),
+                            producer.topic()
+                        );
+                    }
 
                     let Some(state) = produced_messages.state else {
                         debug!("No state provided for source connector with 
ID: {plugin_id}");
diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs
index 98066f33d..b91ade3dc 100644
--- a/core/connectors/sdk/src/lib.rs
+++ b/core/connectors/sdk/src/lib.rs
@@ -324,4 +324,6 @@ pub enum Error {
     InvalidState,
     #[error("Connection error: {0}")]
     Connection(String),
+    #[error("Cannot store data: {0}")]
+    CannotStoreData(String),
 }
diff --git a/core/connectors/sinks/README.md b/core/connectors/sinks/README.md
index f29b4b92f..a2782c8e6 100644
--- a/core/connectors/sinks/README.md
+++ b/core/connectors/sinks/README.md
@@ -40,6 +40,7 @@ pub struct SinkConfig {
     pub streams: Vec<StreamConsumerConfig>,
     pub plugin_config_format: Option<ConfigFormat>,
     pub plugin_config: Option<serde_json::Value>,
+    pub verbose: bool, // Log message processing at info level instead of 
debug (default: false)
 }
 ```
 
@@ -64,6 +65,7 @@ version = 0
 name = "Stdout sink"
 path = "target/release/libiggy_connector_stdout_sink"
 plugin_config_format = "toml"
+verbose = false # Log message processing at info level instead of debug
 
 # Collection of the streams from which messages are consumed
 [[streams]]
diff --git 
a/core/integration/tests/connectors/postgres/connectors_config/sink.toml 
b/core/connectors/sinks/postgres_sink/config.toml
similarity index 81%
rename from 
core/integration/tests/connectors/postgres/connectors_config/sink.toml
rename to core/connectors/sinks/postgres_sink/config.toml
index 7476a2126..84a79def5 100644
--- a/core/integration/tests/connectors/postgres/connectors_config/sink.toml
+++ b/core/connectors/sinks/postgres_sink/config.toml
@@ -20,18 +20,19 @@ key = "postgres"
 enabled = true
 version = 0
 name = "Postgres sink"
-path = "../../target/debug/libiggy_connector_postgres_sink"
+path = "../../../target/debug/libiggy_connector_postgres_sink"
+verbose = false
 
 [[streams]]
-stream = "test_stream"
-topics = ["test_topic"]
+stream = "user_events"
+topics = ["users", "orders"]
 schema = "json"
 batch_length = 100
 poll_interval = "5ms"
-consumer_group = "test"
+consumer_group = "postgres_sink"
 
 [plugin_config]
-connection_string = ""
+connection_string = "postgresql://user:pass@localhost:5432/database"
 target_table = "iggy_messages"
 batch_size = 100
 max_connections = 10
@@ -39,3 +40,4 @@ auto_create_table = true
 include_metadata = true
 include_checksum = true
 include_origin_timestamp = true
+payload_format = "bytea"
diff --git a/core/connectors/sinks/postgres_sink/src/lib.rs 
b/core/connectors/sinks/postgres_sink/src/lib.rs
index 2acfb7a6a..03757b729 100644
--- a/core/connectors/sinks/postgres_sink/src/lib.rs
+++ b/core/connectors/sinks/postgres_sink/src/lib.rs
@@ -180,12 +180,13 @@ impl PostgresSink {
 
         let pool = self.get_pool()?;
         let table_name = &self.config.target_table;
+        let quoted_table = quote_identifier(table_name)?;
         let include_metadata = self.config.include_metadata.unwrap_or(true);
         let include_checksum = self.config.include_checksum.unwrap_or(true);
         let include_origin_timestamp = 
self.config.include_origin_timestamp.unwrap_or(true);
         let payload_type = self.payload_format().sql_type();
 
-        let mut sql = format!("CREATE TABLE IF NOT EXISTS {table_name} (");
+        let mut sql = format!("CREATE TABLE IF NOT EXISTS {quoted_table} (");
         sql.push_str("id DECIMAL(39, 0) PRIMARY KEY");
 
         if include_metadata {
@@ -263,110 +264,160 @@ impl PostgresSink {
         messages_metadata: &MessagesMetadata,
         pool: &Pool<Postgres>,
     ) -> Result<(), Error> {
+        if messages.is_empty() {
+            return Ok(());
+        }
+
         let table_name = &self.config.target_table;
         let include_metadata = self.config.include_metadata.unwrap_or(true);
         let include_checksum = self.config.include_checksum.unwrap_or(true);
         let include_origin_timestamp = 
self.config.include_origin_timestamp.unwrap_or(true);
         let payload_format = self.payload_format();
 
-        for message in messages {
-            let payload_bytes = 
message.payload.clone().try_into_vec().map_err(|e| {
-                error!("Failed to convert payload to bytes: {e}");
-                Error::InvalidRecord
-            })?;
-
-            let json_value = self.parse_json_payload(&payload_bytes, 
payload_format)?;
-            let text_value = self.parse_text_payload(&payload_bytes, 
payload_format)?;
-
-            let (query, _) = self.build_insert_query(
-                table_name,
-                include_metadata,
-                include_checksum,
-                include_origin_timestamp,
-            );
-
-            let timestamp = self.parse_timestamp(message.timestamp);
-            let origin_timestamp = 
self.parse_timestamp(message.origin_timestamp);
-
-            self.execute_insert_with_retry(
-                pool,
-                &query,
-                message,
-                topic_metadata,
-                messages_metadata,
-                include_metadata,
-                include_checksum,
-                include_origin_timestamp,
-                timestamp,
-                origin_timestamp,
-                payload_format,
-                &payload_bytes,
-                &json_value,
-                &text_value,
-            )
-            .await?;
-        }
-
-        Ok(())
+        let (query, _params_per_row) = self.build_batch_insert_query(
+            table_name,
+            include_metadata,
+            include_checksum,
+            include_origin_timestamp,
+            messages.len(),
+        )?;
+
+        self.execute_batch_insert_with_retry(
+            pool,
+            &query,
+            messages,
+            topic_metadata,
+            messages_metadata,
+            include_metadata,
+            include_checksum,
+            include_origin_timestamp,
+            payload_format,
+        )
+        .await
     }
 
     #[allow(clippy::too_many_arguments)]
-    async fn execute_insert_with_retry(
+    async fn execute_batch_insert_with_retry(
         &self,
         pool: &Pool<Postgres>,
         query: &str,
-        message: &ConsumedMessage,
+        messages: &[ConsumedMessage],
         topic_metadata: &TopicMetadata,
         messages_metadata: &MessagesMetadata,
         include_metadata: bool,
         include_checksum: bool,
         include_origin_timestamp: bool,
-        timestamp: DateTime<Utc>,
-        origin_timestamp: DateTime<Utc>,
         payload_format: PayloadFormat,
-        payload_bytes: &[u8],
-        json_value: &Option<serde_json::Value>,
-        text_value: &Option<String>,
     ) -> Result<(), Error> {
         let max_retries = self.get_max_retries();
         let retry_delay = self.get_retry_delay();
         let mut attempts = 0u32;
 
         loop {
-            let mut query_obj = 
sqlx::query(query).bind(message.id.to_string());
+            let result = self
+                .bind_and_execute_batch(
+                    pool,
+                    query,
+                    messages,
+                    topic_metadata,
+                    messages_metadata,
+                    include_metadata,
+                    include_checksum,
+                    include_origin_timestamp,
+                    payload_format,
+                )
+                .await;
+
+            match result {
+                Ok(_) => return Ok(()),
+                Err((e, is_transient)) => {
+                    attempts += 1;
+                    if !is_transient || attempts >= max_retries {
+                        error!("Batch insert failed after {attempts} attempts: 
{e}");
+                        return Err(Error::CannotStoreData(format!(
+                            "Batch insert failed after {attempts} attempts: 
{e}"
+                        )));
+                    }
+                    warn!(
+                        "Transient database error (attempt 
{attempts}/{max_retries}): {e}. Retrying..."
+                    );
+                    tokio::time::sleep(retry_delay * attempts).await;
+                }
+            }
+        }
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    async fn bind_and_execute_batch(
+        &self,
+        pool: &Pool<Postgres>,
+        query: &str,
+        messages: &[ConsumedMessage],
+        topic_metadata: &TopicMetadata,
+        messages_metadata: &MessagesMetadata,
+        include_metadata: bool,
+        include_checksum: bool,
+        include_origin_timestamp: bool,
+        payload_format: PayloadFormat,
+    ) -> Result<(), (sqlx::Error, bool)> {
+        let mut query_builder = sqlx::query(query);
+
+        for message in messages {
+            let payload_bytes = 
message.payload.clone().try_into_vec().map_err(|e| {
+                let err_msg = format!("Failed to convert payload to bytes: 
{e}");
+                (sqlx::Error::Protocol(err_msg), false)
+            })?;
+
+            let timestamp = self.parse_timestamp(message.timestamp);
+            let origin_timestamp_val = 
self.parse_timestamp(message.origin_timestamp);
+
+            query_builder = query_builder.bind(message.id.to_string());
 
             if include_metadata {
-                query_obj = query_obj
+                query_builder = query_builder
                     .bind(message.offset as i64)
                     .bind(timestamp)
-                    .bind(&topic_metadata.stream)
-                    .bind(&topic_metadata.topic)
+                    .bind(topic_metadata.stream.clone())
+                    .bind(topic_metadata.topic.clone())
                     .bind(messages_metadata.partition_id as i32);
             }
 
             if include_checksum {
-                query_obj = query_obj.bind(message.checksum as i64);
+                query_builder = query_builder.bind(message.checksum as i64);
             }
 
             if include_origin_timestamp {
-                query_obj = query_obj.bind(origin_timestamp);
+                query_builder = query_builder.bind(origin_timestamp_val);
             }
 
-            query_obj = match payload_format {
-                PayloadFormat::Bytea => query_obj.bind(payload_bytes.to_vec()),
-                PayloadFormat::Json => query_obj.bind(json_value.clone()),
-                PayloadFormat::Text => query_obj.bind(text_value.clone()),
-            };
-
-            match query_obj.execute(pool).await {
-                Ok(_) => return Ok(()),
-                Err(e) => {
-                    attempts += 1;
-                    handle_retry_error(e, attempts, max_retries)?;
-                    tokio::time::sleep(retry_delay * attempts).await;
+            query_builder = match payload_format {
+                PayloadFormat::Bytea => query_builder.bind(payload_bytes),
+                PayloadFormat::Json => {
+                    let json_value: serde_json::Value = 
serde_json::from_slice(&payload_bytes)
+                        .map_err(|e| {
+                            let err_msg = format!("Failed to parse payload as 
JSON: {e}");
+                            error!("{err_msg}");
+                            (sqlx::Error::Protocol(err_msg), false)
+                        })?;
+                    query_builder.bind(json_value)
                 }
-            }
+                PayloadFormat::Text => {
+                    let text_value = 
String::from_utf8(payload_bytes).map_err(|e| {
+                        let err_msg = format!("Failed to parse payload as 
UTF-8 text: {e}");
+                        error!("{err_msg}");
+                        (sqlx::Error::Protocol(err_msg), false)
+                    })?;
+                    query_builder.bind(text_value)
+                }
+            };
         }
+
+        query_builder.execute(pool).await.map_err(|e| {
+            let is_transient = is_transient_error(&e);
+            (e, is_transient)
+        })?;
+
+        Ok(())
     }
 
     fn get_pool(&self) -> Result<&Pool<Postgres>, Error> {
@@ -402,90 +453,56 @@ impl PostgresSink {
         .unwrap_or_else(Utc::now)
     }
 
-    fn parse_json_payload(
-        &self,
-        payload_bytes: &[u8],
-        format: PayloadFormat,
-    ) -> Result<Option<serde_json::Value>, Error> {
-        match format {
-            PayloadFormat::Json => {
-                let value = serde_json::from_slice(payload_bytes).map_err(|e| {
-                    error!("Failed to parse payload as JSON: {e}");
-                    Error::InvalidRecord
-                })?;
-                Ok(Some(value))
-            }
-            _ => Ok(None),
-        }
-    }
-
-    fn parse_text_payload(
-        &self,
-        payload_bytes: &[u8],
-        format: PayloadFormat,
-    ) -> Result<Option<String>, Error> {
-        match format {
-            PayloadFormat::Text => {
-                let value = 
String::from_utf8(payload_bytes.to_vec()).map_err(|e| {
-                    error!("Failed to parse payload as UTF-8 text: {e}");
-                    Error::InvalidRecord
-                })?;
-                Ok(Some(value))
-            }
-            _ => Ok(None),
-        }
-    }
-
-    fn build_insert_query(
+    fn build_batch_insert_query(
         &self,
         table_name: &str,
         include_metadata: bool,
         include_checksum: bool,
         include_origin_timestamp: bool,
-    ) -> (String, u32) {
-        let mut query = format!("INSERT INTO {table_name} (id");
-        let mut values = "($1::numeric".to_string();
-        let mut param_count = 1;
+        row_count: usize,
+    ) -> Result<(String, u32), Error> {
+        let quoted_table = quote_identifier(table_name)?;
+        let mut query = format!("INSERT INTO {quoted_table} (id");
+
+        let mut params_per_row: u32 = 1; // id
 
         if include_metadata {
             query.push_str(
                 ", iggy_offset, iggy_timestamp, iggy_stream, iggy_topic, 
iggy_partition_id",
             );
-            for i in 2..=6 {
-                values.push_str(&format!(", ${i}"));
-            }
-            param_count = 6;
+            params_per_row += 5;
         }
 
         if include_checksum {
-            param_count += 1;
             query.push_str(", iggy_checksum");
-            values.push_str(&format!(", ${param_count}"));
+            params_per_row += 1;
         }
 
         if include_origin_timestamp {
-            param_count += 1;
             query.push_str(", iggy_origin_timestamp");
-            values.push_str(&format!(", ${param_count}"));
+            params_per_row += 1;
         }
 
-        param_count += 1;
         query.push_str(", payload");
-        values.push_str(&format!(", ${param_count}"));
+        params_per_row += 1;
 
-        query.push_str(&format!(") VALUES {values})"));
+        query.push_str(") VALUES ");
 
-        (query, param_count)
-    }
-}
+        let mut value_groups = Vec::with_capacity(row_count);
+        for row_idx in 0..row_count {
+            let base_param = (row_idx as u32) * params_per_row;
+            let mut values = format!("(${}::numeric", base_param + 1);
+            for i in 2..=params_per_row {
+                values.push_str(&format!(", ${}", base_param + i));
+            }
+            values.push(')');
+            value_groups.push(values);
+        }
 
-fn handle_retry_error(e: sqlx::Error, attempts: u32, max_retries: u32) -> 
Result<bool, Error> {
-    if attempts >= max_retries || !is_transient_error(&e) {
-        error!("Database operation failed after {attempts} attempts: {e}");
-        return Err(Error::InvalidRecord);
+        query.push_str(&value_groups.join(", "));
+
+        Ok((query, params_per_row))
     }
-    warn!("Transient database error (attempt {attempts}/{max_retries}): {e}. 
Retrying...");
-    Ok(true)
 }
 
 fn is_transient_error(e: &sqlx::Error) -> bool {
@@ -504,6 +521,19 @@ fn is_transient_error(e: &sqlx::Error) -> bool {
     }
 }
 
+fn quote_identifier(name: &str) -> Result<String, Error> {
+    if name.is_empty() {
+        return Err(Error::InitError("Table name cannot be empty".to_string()));
+    }
+    if name.contains('\0') {
+        return Err(Error::InitError(
+            "Table name cannot contain null characters".to_string(),
+        ));
+    }
+    let escaped = name.replace('"', "\"\"");
+    Ok(format!("\"{escaped}\""))
+}
+
 fn redact_connection_string(conn_str: &str) -> String {
     if let Some(scheme_end) = conn_str.find("://") {
         let scheme = &conn_str[..scheme_end + 3];
@@ -587,9 +617,11 @@ mod tests {
     #[test]
     fn given_all_options_enabled_should_build_full_insert_query() {
         let sink = PostgresSink::new(1, test_config());
-        let (query, param_count) = sink.build_insert_query("messages", true, 
true, true);
+        let (query, param_count) = sink
+            .build_batch_insert_query("messages", true, true, true, 1)
+            .expect("Failed to build query");
 
-        assert!(query.contains("INSERT INTO messages"));
+        assert!(query.contains("INSERT INTO \"messages\""));
         assert!(query.contains("iggy_offset"));
         assert!(query.contains("iggy_timestamp"));
         assert!(query.contains("iggy_stream"));
@@ -604,9 +636,11 @@ mod tests {
     #[test]
     fn given_metadata_disabled_should_build_minimal_insert_query() {
         let sink = PostgresSink::new(1, test_config());
-        let (query, param_count) = sink.build_insert_query("messages", false, 
false, false);
+        let (query, param_count) = sink
+            .build_batch_insert_query("messages", false, false, false, 1)
+            .expect("Failed to build query");
 
-        assert!(query.contains("INSERT INTO messages"));
+        assert!(query.contains("INSERT INTO \"messages\""));
         assert!(!query.contains("iggy_offset"));
         assert!(!query.contains("iggy_checksum"));
         assert!(!query.contains("iggy_origin_timestamp"));
@@ -617,7 +651,9 @@ mod tests {
     #[test]
     fn given_only_checksum_enabled_should_include_checksum() {
         let sink = PostgresSink::new(1, test_config());
-        let (query, param_count) = sink.build_insert_query("messages", false, 
true, false);
+        let (query, param_count) = sink
+            .build_batch_insert_query("messages", false, true, false, 1)
+            .expect("Failed to build query");
 
         assert!(!query.contains("iggy_offset"));
         assert!(query.contains("iggy_checksum"));
@@ -696,4 +732,72 @@ mod tests {
         let redacted = redact_connection_string(conn);
         assert_eq!(redacted, "postgresql://adm***");
     }
+
+    #[test]
+    fn given_special_chars_in_identifier_should_escape() {
+        let result = quote_identifier("table\"name").expect("Failed to quote");
+        assert_eq!(result, "\"table\"\"name\"");
+    }
+
+    #[test]
+    fn given_empty_identifier_should_fail() {
+        let result = quote_identifier("");
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn given_null_char_in_identifier_should_fail() {
+        let result = quote_identifier("table\0name");
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn given_normal_identifier_should_quote() {
+        let result = quote_identifier("my_table").expect("Failed to quote");
+        assert_eq!(result, "\"my_table\"");
+    }
+
+    #[test]
+    fn given_identifier_with_spaces_should_quote() {
+        let result = quote_identifier("my table").expect("Failed to quote");
+        assert_eq!(result, "\"my table\"");
+    }
+
+    #[test]
+    fn given_identifier_with_sql_injection_should_escape() {
+        let result = quote_identifier("messages\"; DROP TABLE users; 
--").expect("Failed to quote");
+        assert_eq!(result, "\"messages\"\"; DROP TABLE users; --\"");
+    }
+
+    #[test]
+    fn given_batch_of_3_rows_should_build_multi_row_insert_query() {
+        let sink = PostgresSink::new(1, test_config());
+        let (query, params_per_row) = sink
+            .build_batch_insert_query("messages", true, true, true, 3)
+            .expect("Failed to build batch query");
+
+        // With all options: id + 5 metadata + checksum + origin_timestamp + 
payload = 9 params per row
+        assert_eq!(params_per_row, 9);
+
+        // Should have 3 value groups
+        assert!(query.contains("($1::numeric, $2, $3, $4, $5, $6, $7, $8, 
$9)"));
+        assert!(query.contains("($10::numeric, $11, $12, $13, $14, $15, $16, 
$17, $18)"));
+        assert!(query.contains("($19::numeric, $20, $21, $22, $23, $24, $25, 
$26, $27)"));
+    }
+
+    #[test]
+    fn given_batch_of_2_rows_minimal_should_build_correct_query() {
+        let sink = PostgresSink::new(1, test_config());
+        let (query, params_per_row) = sink
+            .build_batch_insert_query("messages", false, false, false, 2)
+            .expect("Failed to build batch query");
+
+        // With minimal options: id + payload = 2 params per row
+        assert_eq!(params_per_row, 2);
+
+        // Should have 2 value groups
+        assert!(query.contains("($1::numeric, $2)"));
+        assert!(query.contains("($3::numeric, $4)"));
+        assert!(!query.contains("$5"));
+    }
 }
diff --git a/core/connectors/sources/README.md 
b/core/connectors/sources/README.md
index 4bdb8e985..90c453545 100644
--- a/core/connectors/sources/README.md
+++ b/core/connectors/sources/README.md
@@ -35,6 +35,7 @@ pub struct SourceConfig {
     pub streams: Vec<StreamProducerConfig>,
     pub plugin_config_format: Option<ConfigFormat>,
     pub plugin_config: Option<serde_json::Value>,
+    pub verbose: bool, // Log message processing at info level instead of 
debug (default: false)
 }
 ```
 
@@ -59,6 +60,7 @@ version = 0
 name = "Random source" # Name of the source
 path = "libiggy_connector_random_source" # Path to the source connector
 config_format = "toml"
+verbose = false # Log message processing at info level instead of debug
 
 # Collection of the streams to which the produced messages are sent
 [[streams]]
diff --git a/core/connectors/sources/postgres_source/README.md 
b/core/connectors/sources/postgres_source/README.md
index 0656f4ef5..000f3fb1f 100644
--- a/core/connectors/sources/postgres_source/README.md
+++ b/core/connectors/sources/postgres_source/README.md
@@ -21,7 +21,7 @@ The PostgreSQL source connector fetches data from PostgreSQL 
databases and strea
 connection_string = "postgresql://user:pass@localhost:5432/database"
 mode = "polling"
 tables = ["users", "orders"]
-poll_interval = "30s"
+poll_interval = "1s"
 batch_size = 1000
 tracking_column = "id"
 initial_offset = "0"
@@ -56,7 +56,7 @@ cdc_backend = "builtin"
 | `connection_string` | string | required | PostgreSQL connection string |
 | `mode` | string | required | `polling` or `cdc` |
 | `tables` | array | required | List of tables to monitor |
-| `poll_interval` | string | `10s` | How often to poll (e.g., `30s`, `5m`) |
+| `poll_interval` | string | `1s` | How often to poll (e.g., `1s`, `5m`) |
 | `batch_size` | u32 | `1000` | Max rows per poll |
 | `tracking_column` | string | `id` | Column for incremental updates |
 | `initial_offset` | string | none | Starting value for tracking column |
@@ -289,7 +289,7 @@ batch_length = 100
 connection_string = "postgresql://user:pass@localhost:5432/mydb"
 mode = "polling"
 tables = ["users"]
-poll_interval = "10s"
+poll_interval = "1s"
 tracking_column = "updated_at"
 ```
 
@@ -326,7 +326,7 @@ batch_length = 100
 connection_string = "postgresql://user:pass@localhost:5432/mydb"
 mode = "polling"
 tables = ["events"]
-poll_interval = "10s"
+poll_interval = "1s"
 tracking_column = "id"
 payload_column = "data"
 payload_format = "json_direct"
diff --git 
a/core/integration/tests/connectors/postgres/source_config/source.toml 
b/core/connectors/sources/postgres_source/config.toml
similarity index 76%
rename from core/integration/tests/connectors/postgres/source_config/source.toml
rename to core/connectors/sources/postgres_source/config.toml
index ac5a36e8c..be53647af 100644
--- a/core/integration/tests/connectors/postgres/source_config/source.toml
+++ b/core/connectors/sources/postgres_source/config.toml
@@ -20,20 +20,23 @@ key = "postgres"
 enabled = true
 version = 0
 name = "Postgres source"
-path = "../../target/debug/libiggy_connector_postgres_source"
+path = "../../../target/debug/libiggy_connector_postgres_source"
+verbose = false
 
 [[streams]]
-stream = "test_stream"
-topic = "test_topic"
+stream = "user_events"
+topic = "users"
 schema = "json"
 batch_length = 100
 
 [plugin_config]
-connection_string = ""
+connection_string = "postgresql://user:pass@localhost:5432/database"
 mode = "polling"
-tables = ["test_messages"]
-poll_interval = "5ms"
-batch_size = 100
+tables = ["users", "orders"]
+poll_interval = "1s"
+batch_size = 1000
 tracking_column = "id"
+initial_offset = "0"
 max_connections = 10
+snake_case_columns = false
 include_metadata = true
diff --git a/core/integration/tests/connectors/mod.rs 
b/core/integration/tests/connectors/mod.rs
index 7f468c35e..ffbbf0dca 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -79,14 +79,14 @@ struct ConnectorsIggyClient {
 
 fn create_test_messages(count: usize) -> Vec<TestMessage> {
     let base_timestamp = IggyTimestamp::now().as_micros();
-    (0..count)
+    (1..=count)
         .map(|i| TestMessage {
             id: i as u64,
-            name: format!("user_{i}"),
-            count: (i * 10) as u32,
-            amount: i as f64 * 99.99,
-            active: i % 2 == 0,
-            timestamp: (base_timestamp + i as u64 * ONE_DAY_MICROS) as i64,
+            name: format!("user_{}", i - 1),
+            count: ((i - 1) * 10) as u32,
+            amount: (i - 1) as f64 * 99.99,
+            active: (i - 1) % 2 == 0,
+            timestamp: (base_timestamp + (i - 1) as u64 * ONE_DAY_MICROS) as 
i64,
         })
         .collect()
 }
diff --git a/core/integration/tests/connectors/postgres/mod.rs 
b/core/integration/tests/connectors/postgres/mod.rs
index ec4be1c00..e6e41bbb2 100644
--- a/core/integration/tests/connectors/postgres/mod.rs
+++ b/core/integration/tests/connectors/postgres/mod.rs
@@ -51,8 +51,37 @@ const FETCH_INTERVAL_MS: u64 = 10;
 const TABLE_WAIT_INTERVAL_MS: u64 = 50;
 const ENV_SINK_CONNECTION_STRING: &str =
     "IGGY_CONNECTORS_SINK_POSTGRES_PLUGIN_CONFIG_CONNECTION_STRING";
+const ENV_SINK_TARGET_TABLE: &str = 
"IGGY_CONNECTORS_SINK_POSTGRES_PLUGIN_CONFIG_TARGET_TABLE";
+const ENV_SINK_PAYLOAD_FORMAT: &str = 
"IGGY_CONNECTORS_SINK_POSTGRES_PLUGIN_CONFIG_PAYLOAD_FORMAT";
+const ENV_SINK_STREAMS_0_STREAM: &str = 
"IGGY_CONNECTORS_SINK_POSTGRES_STREAMS_0_STREAM";
+const ENV_SINK_STREAMS_0_TOPICS: &str = 
"IGGY_CONNECTORS_SINK_POSTGRES_STREAMS_0_TOPICS";
+const ENV_SINK_STREAMS_0_SCHEMA: &str = 
"IGGY_CONNECTORS_SINK_POSTGRES_STREAMS_0_SCHEMA";
+const ENV_SINK_STREAMS_0_CONSUMER_GROUP: &str =
+    "IGGY_CONNECTORS_SINK_POSTGRES_STREAMS_0_CONSUMER_GROUP";
 const ENV_SOURCE_CONNECTION_STRING: &str =
     "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_CONNECTION_STRING";
+const ENV_SOURCE_TABLES: &str = 
"IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_TABLES";
+const ENV_SOURCE_TRACKING_COLUMN: &str =
+    "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_TRACKING_COLUMN";
+const ENV_SOURCE_PAYLOAD_COLUMN: &str =
+    "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_PAYLOAD_COLUMN";
+const ENV_SOURCE_PAYLOAD_FORMAT: &str =
+    "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_PAYLOAD_FORMAT";
+const ENV_SOURCE_DELETE_AFTER_READ: &str =
+    "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_DELETE_AFTER_READ";
+const ENV_SOURCE_PRIMARY_KEY_COLUMN: &str =
+    "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_PRIMARY_KEY_COLUMN";
+const ENV_SOURCE_PROCESSED_COLUMN: &str =
+    "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_PROCESSED_COLUMN";
+const ENV_SOURCE_INCLUDE_METADATA: &str =
+    "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_INCLUDE_METADATA";
+const ENV_SOURCE_STREAMS_0_STREAM: &str = 
"IGGY_CONNECTORS_SOURCE_POSTGRES_STREAMS_0_STREAM";
+const ENV_SOURCE_STREAMS_0_TOPIC: &str = 
"IGGY_CONNECTORS_SOURCE_POSTGRES_STREAMS_0_TOPIC";
+const ENV_SOURCE_STREAMS_0_SCHEMA: &str = 
"IGGY_CONNECTORS_SOURCE_POSTGRES_STREAMS_0_SCHEMA";
+const ENV_SOURCE_POLL_INTERVAL: &str =
+    "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_POLL_INTERVAL";
+const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_POSTGRES_PATH";
+const ENV_SOURCE_PATH: &str = "IGGY_CONNECTORS_SOURCE_POSTGRES_PATH";
 
 type SinkRow = (i64, String, String, Vec<u8>);
 type SinkJsonRow = (i64, serde_json::Value);
@@ -93,29 +122,50 @@ async fn create_pool(connection_string: &str) -> 
Pool<Postgres> {
 }
 
 async fn setup_sink() -> PostgresTestSetup {
-    setup_sink_with_config("postgres/sink.toml").await
+    let mut envs = HashMap::new();
+    envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_owned(), "json".to_owned());
+    setup_sink_with_envs(envs).await
 }
 
 async fn setup_sink_bytea() -> PostgresTestSetup {
-    setup_sink_with_config("postgres/sink_raw.toml").await
+    let mut envs = HashMap::new();
+    envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_owned(), "raw".to_owned());
+    envs.insert(ENV_SINK_PAYLOAD_FORMAT.to_owned(), "bytea".to_owned());
+    setup_sink_with_envs(envs).await
 }
 
 async fn setup_sink_json() -> PostgresTestSetup {
-    setup_sink_with_config("postgres/sink_json.toml").await
+    let mut envs = HashMap::new();
+    envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_owned(), "json".to_owned());
+    envs.insert(ENV_SINK_PAYLOAD_FORMAT.to_owned(), "json".to_owned());
+    setup_sink_with_envs(envs).await
 }
 
-async fn setup_sink_with_config(config_path: &str) -> PostgresTestSetup {
+async fn setup_sink_with_envs(mut extra_envs: HashMap<String, String>) -> 
PostgresTestSetup {
     let (container, connection_string) = start_container().await;
 
-    let mut envs = HashMap::new();
-    envs.insert(
+    extra_envs.insert(
         ENV_SINK_CONNECTION_STRING.to_owned(),
         connection_string.clone(),
     );
+    extra_envs.insert(ENV_SINK_TARGET_TABLE.to_owned(), SINK_TABLE.to_owned());
+    extra_envs.insert(ENV_SINK_STREAMS_0_STREAM.to_owned(), 
TEST_STREAM.to_owned());
+    extra_envs.insert(
+        ENV_SINK_STREAMS_0_TOPICS.to_owned(),
+        format!("[{TEST_TOPIC}]"),
+    );
+    extra_envs.insert(
+        ENV_SINK_STREAMS_0_CONSUMER_GROUP.to_owned(),
+        "test".to_owned(),
+    );
+    extra_envs.insert(
+        ENV_SINK_PATH.to_owned(),
+        "../../target/debug/libiggy_connector_postgres_sink".to_owned(),
+    );
 
     let mut runtime = setup_runtime();
     runtime
-        .init(config_path, Some(envs), IggySetup::default())
+        .init("postgres/sink.toml", Some(extra_envs), IggySetup::default())
         .await;
 
     PostgresTestSetup {
@@ -145,21 +195,11 @@ async fn setup_source() -> PostgresTestSetup {
     pool.close().await;
 
     let mut envs = HashMap::new();
-    envs.insert(
-        ENV_SOURCE_CONNECTION_STRING.to_owned(),
-        connection_string.clone(),
-    );
+    envs.insert(ENV_SOURCE_TABLES.to_owned(), format!("[{SOURCE_TABLE}]"));
+    envs.insert(ENV_SOURCE_INCLUDE_METADATA.to_owned(), "true".to_owned());
+    envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_owned(), "json".to_owned());
 
-    let mut runtime = setup_runtime();
-    runtime
-        .init("postgres/source.toml", Some(envs), IggySetup::default())
-        .await;
-
-    PostgresTestSetup {
-        runtime,
-        connection_string,
-        container,
-    }
+    setup_source_with_envs(container, connection_string, envs).await
 }
 
 async fn setup_source_bytea() -> PostgresTestSetup {
@@ -179,24 +219,14 @@ async fn setup_source_bytea() -> PostgresTestSetup {
 
     let mut envs = HashMap::new();
     envs.insert(
-        ENV_SOURCE_CONNECTION_STRING.to_owned(),
-        connection_string.clone(),
+        ENV_SOURCE_TABLES.to_owned(),
+        format!("[{SOURCE_TABLE_BYTEA}]"),
     );
+    envs.insert(ENV_SOURCE_PAYLOAD_COLUMN.to_owned(), "payload".to_owned());
+    envs.insert(ENV_SOURCE_PAYLOAD_FORMAT.to_owned(), "bytea".to_owned());
+    envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_owned(), "raw".to_owned());
 
-    let mut runtime = setup_runtime();
-    runtime
-        .init(
-            "postgres/source_bytea.toml",
-            Some(envs),
-            IggySetup::default(),
-        )
-        .await;
-
-    PostgresTestSetup {
-        runtime,
-        connection_string,
-        container,
-    }
+    setup_source_with_envs(container, connection_string, envs).await
 }
 
 async fn setup_source_json() -> PostgresTestSetup {
@@ -216,24 +246,17 @@ async fn setup_source_json() -> PostgresTestSetup {
 
     let mut envs = HashMap::new();
     envs.insert(
-        ENV_SOURCE_CONNECTION_STRING.to_owned(),
-        connection_string.clone(),
+        ENV_SOURCE_TABLES.to_owned(),
+        format!("[{SOURCE_TABLE_JSON}]"),
     );
+    envs.insert(ENV_SOURCE_PAYLOAD_COLUMN.to_owned(), "data".to_owned());
+    envs.insert(
+        ENV_SOURCE_PAYLOAD_FORMAT.to_owned(),
+        "json_direct".to_owned(),
+    );
+    envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_owned(), "json".to_owned());
 
-    let mut runtime = setup_runtime();
-    runtime
-        .init(
-            "postgres/source_json.toml",
-            Some(envs),
-            IggySetup::default(),
-        )
-        .await;
-
-    PostgresTestSetup {
-        runtime,
-        connection_string,
-        container,
-    }
+    setup_source_with_envs(container, connection_string, envs).await
 }
 
 async fn setup_source_delete() -> PostgresTestSetup {
@@ -254,24 +277,15 @@ async fn setup_source_delete() -> PostgresTestSetup {
 
     let mut envs = HashMap::new();
     envs.insert(
-        ENV_SOURCE_CONNECTION_STRING.to_owned(),
-        connection_string.clone(),
+        ENV_SOURCE_TABLES.to_owned(),
+        format!("[{SOURCE_TABLE_DELETE}]"),
     );
+    envs.insert(ENV_SOURCE_PRIMARY_KEY_COLUMN.to_owned(), "id".to_owned());
+    envs.insert(ENV_SOURCE_DELETE_AFTER_READ.to_owned(), "true".to_owned());
+    envs.insert(ENV_SOURCE_INCLUDE_METADATA.to_owned(), "true".to_owned());
+    envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_owned(), "json".to_owned());
 
-    let mut runtime = setup_runtime();
-    runtime
-        .init(
-            "postgres/source_delete.toml",
-            Some(envs),
-            IggySetup::default(),
-        )
-        .await;
-
-    PostgresTestSetup {
-        runtime,
-        connection_string,
-        container,
-    }
+    setup_source_with_envs(container, connection_string, envs).await
 }
 
 async fn setup_source_mark() -> PostgresTestSetup {
@@ -293,15 +307,46 @@ async fn setup_source_mark() -> PostgresTestSetup {
 
     let mut envs = HashMap::new();
     envs.insert(
+        ENV_SOURCE_TABLES.to_owned(),
+        format!("[{SOURCE_TABLE_MARK}]"),
+    );
+    envs.insert(ENV_SOURCE_PRIMARY_KEY_COLUMN.to_owned(), "id".to_owned());
+    envs.insert(
+        ENV_SOURCE_PROCESSED_COLUMN.to_owned(),
+        "is_processed".to_owned(),
+    );
+    envs.insert(ENV_SOURCE_INCLUDE_METADATA.to_owned(), "true".to_owned());
+    envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_owned(), "json".to_owned());
+
+    setup_source_with_envs(container, connection_string, envs).await
+}
+
+async fn setup_source_with_envs(
+    container: ContainerAsync<postgres::Postgres>,
+    connection_string: String,
+    mut extra_envs: HashMap<String, String>,
+) -> PostgresTestSetup {
+    extra_envs.insert(
         ENV_SOURCE_CONNECTION_STRING.to_owned(),
         connection_string.clone(),
     );
+    extra_envs.insert(ENV_SOURCE_TRACKING_COLUMN.to_owned(), "id".to_owned());
+    extra_envs.insert(
+        ENV_SOURCE_STREAMS_0_STREAM.to_owned(),
+        TEST_STREAM.to_owned(),
+    );
+    extra_envs.insert(ENV_SOURCE_STREAMS_0_TOPIC.to_owned(), 
TEST_TOPIC.to_owned());
+    extra_envs.insert(ENV_SOURCE_POLL_INTERVAL.to_owned(), "10ms".to_owned());
+    extra_envs.insert(
+        ENV_SOURCE_PATH.to_owned(),
+        "../../target/debug/libiggy_connector_postgres_source".to_owned(),
+    );
 
     let mut runtime = setup_runtime();
     runtime
         .init(
-            "postgres/source_mark.toml",
-            Some(envs),
+            "postgres/source.toml",
+            Some(extra_envs),
             IggySetup::default(),
         )
         .await;
diff --git a/core/integration/tests/connectors/postgres/sink.toml 
b/core/integration/tests/connectors/postgres/sink.toml
index 7336b47b1..61b3ddada 100644
--- a/core/integration/tests/connectors/postgres/sink.toml
+++ b/core/integration/tests/connectors/postgres/sink.toml
@@ -17,4 +17,4 @@
 
 [connectors]
 config_type = "local"
-config_dir = "tests/connectors/postgres/connectors_config"
+config_dir = "../connectors/sinks/postgres_sink"
diff --git a/core/integration/tests/connectors/postgres/sink_json.toml 
b/core/integration/tests/connectors/postgres/sink_json.toml
deleted file mode 100644
index 093005684..000000000
--- a/core/integration/tests/connectors/postgres/sink_json.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[connectors]
-config_type = "local"
-config_dir = "tests/connectors/postgres/sink_json_config"
diff --git 
a/core/integration/tests/connectors/postgres/sink_json_config/sink.toml 
b/core/integration/tests/connectors/postgres/sink_json_config/sink.toml
deleted file mode 100644
index c55ff5b71..000000000
--- a/core/integration/tests/connectors/postgres/sink_json_config/sink.toml
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-type = "sink"
-key = "postgres"
-enabled = true
-version = 0
-name = "Postgres sink json"
-path = "../../target/debug/libiggy_connector_postgres_sink"
-
-[[streams]]
-stream = "test_stream"
-topics = ["test_topic"]
-schema = "json"
-batch_length = 100
-poll_interval = "5ms"
-consumer_group = "test"
-
-[plugin_config]
-connection_string = ""
-target_table = "iggy_messages"
-batch_size = 100
-max_connections = 10
-auto_create_table = true
-include_metadata = true
-include_checksum = true
-include_origin_timestamp = true
-payload_format = "json"
diff --git a/core/integration/tests/connectors/postgres/sink_raw.toml 
b/core/integration/tests/connectors/postgres/sink_raw.toml
deleted file mode 100644
index da6ff86e5..000000000
--- a/core/integration/tests/connectors/postgres/sink_raw.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[connectors]
-config_type = "local"
-config_dir = "tests/connectors/postgres/sink_raw_config"
diff --git 
a/core/integration/tests/connectors/postgres/sink_raw_config/sink.toml 
b/core/integration/tests/connectors/postgres/sink_raw_config/sink.toml
deleted file mode 100644
index 82b7bf400..000000000
--- a/core/integration/tests/connectors/postgres/sink_raw_config/sink.toml
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-type = "sink"
-key = "postgres"
-enabled = true
-version = 0
-name = "Postgres sink raw"
-path = "../../target/debug/libiggy_connector_postgres_sink"
-
-[[streams]]
-stream = "test_stream"
-topics = ["test_topic"]
-schema = "raw"
-batch_length = 100
-poll_interval = "5ms"
-consumer_group = "test"
-
-[plugin_config]
-connection_string = ""
-target_table = "iggy_messages"
-batch_size = 100
-max_connections = 10
-auto_create_table = true
-include_metadata = true
-include_checksum = true
-include_origin_timestamp = true
-payload_format = "bytea"
diff --git a/core/integration/tests/connectors/postgres/source.toml 
b/core/integration/tests/connectors/postgres/source.toml
index 2da29f3f6..049329c08 100644
--- a/core/integration/tests/connectors/postgres/source.toml
+++ b/core/integration/tests/connectors/postgres/source.toml
@@ -17,4 +17,4 @@
 
 [connectors]
 config_type = "local"
-config_dir = "tests/connectors/postgres/source_config"
+config_dir = "../connectors/sources/postgres_source"
diff --git a/core/integration/tests/connectors/postgres/source_bytea.toml 
b/core/integration/tests/connectors/postgres/source_bytea.toml
deleted file mode 100644
index a815d1bae..000000000
--- a/core/integration/tests/connectors/postgres/source_bytea.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[connectors]
-config_type = "local"
-config_dir = "tests/connectors/postgres/source_bytea_config"
diff --git 
a/core/integration/tests/connectors/postgres/source_bytea_config/source.toml 
b/core/integration/tests/connectors/postgres/source_bytea_config/source.toml
deleted file mode 100644
index c117ee82f..000000000
--- a/core/integration/tests/connectors/postgres/source_bytea_config/source.toml
+++ /dev/null
@@ -1,40 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-type = "source"
-key = "postgres"
-enabled = true
-version = 0
-name = "Postgres source bytea"
-path = "../../target/debug/libiggy_connector_postgres_source"
-
-[[streams]]
-stream = "test_stream"
-topic = "test_topic"
-schema = "raw"
-batch_length = 100
-
-[plugin_config]
-connection_string = ""
-mode = "polling"
-tables = ["test_payloads"]
-poll_interval = "5ms"
-batch_size = 100
-tracking_column = "id"
-max_connections = 10
-payload_column = "payload"
-payload_format = "bytea"
diff --git a/core/integration/tests/connectors/postgres/source_delete.toml 
b/core/integration/tests/connectors/postgres/source_delete.toml
deleted file mode 100644
index 52b7de053..000000000
--- a/core/integration/tests/connectors/postgres/source_delete.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[connectors]
-config_type = "local"
-config_dir = "tests/connectors/postgres/source_delete_config"
diff --git 
a/core/integration/tests/connectors/postgres/source_delete_config/source.toml 
b/core/integration/tests/connectors/postgres/source_delete_config/source.toml
deleted file mode 100644
index 250508a8f..000000000
--- 
a/core/integration/tests/connectors/postgres/source_delete_config/source.toml
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-type = "source"
-key = "postgres"
-enabled = true
-version = 0
-name = "Postgres source with delete after read"
-path = "../../target/debug/libiggy_connector_postgres_source"
-
-[[streams]]
-stream = "test_stream"
-topic = "test_topic"
-schema = "json"
-batch_length = 100
-
-[plugin_config]
-connection_string = ""
-mode = "polling"
-tables = ["test_delete_rows"]
-poll_interval = "5ms"
-batch_size = 100
-tracking_column = "id"
-primary_key_column = "id"
-delete_after_read = true
-max_connections = 10
-include_metadata = true
diff --git a/core/integration/tests/connectors/postgres/source_json.toml 
b/core/integration/tests/connectors/postgres/source_json.toml
deleted file mode 100644
index fa6d44610..000000000
--- a/core/integration/tests/connectors/postgres/source_json.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[connectors]
-config_type = "local"
-config_dir = "tests/connectors/postgres/source_json_config"
diff --git 
a/core/integration/tests/connectors/postgres/source_json_config/source.toml 
b/core/integration/tests/connectors/postgres/source_json_config/source.toml
deleted file mode 100644
index 901776320..000000000
--- a/core/integration/tests/connectors/postgres/source_json_config/source.toml
+++ /dev/null
@@ -1,40 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-type = "source"
-key = "postgres"
-enabled = true
-version = 0
-name = "Postgres source json direct"
-path = "../../target/debug/libiggy_connector_postgres_source"
-
-[[streams]]
-stream = "test_stream"
-topic = "test_topic"
-schema = "json"
-batch_length = 100
-
-[plugin_config]
-connection_string = ""
-mode = "polling"
-tables = ["test_json_payloads"]
-poll_interval = "5ms"
-batch_size = 100
-tracking_column = "id"
-max_connections = 10
-payload_column = "data"
-payload_format = "json_direct"
diff --git a/core/integration/tests/connectors/postgres/source_mark.toml 
b/core/integration/tests/connectors/postgres/source_mark.toml
deleted file mode 100644
index 24143ac08..000000000
--- a/core/integration/tests/connectors/postgres/source_mark.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[connectors]
-config_type = "local"
-config_dir = "tests/connectors/postgres/source_mark_config"
diff --git 
a/core/integration/tests/connectors/postgres/source_mark_config/source.toml 
b/core/integration/tests/connectors/postgres/source_mark_config/source.toml
deleted file mode 100644
index 51571e07b..000000000
--- a/core/integration/tests/connectors/postgres/source_mark_config/source.toml
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-type = "source"
-key = "postgres"
-enabled = true
-version = 0
-name = "Postgres source with processed column"
-path = "../../target/debug/libiggy_connector_postgres_source"
-
-[[streams]]
-stream = "test_stream"
-topic = "test_topic"
-schema = "json"
-batch_length = 100
-
-[plugin_config]
-connection_string = ""
-mode = "polling"
-tables = ["test_mark_rows"]
-poll_interval = "5ms"
-batch_size = 100
-tracking_column = "id"
-primary_key_column = "id"
-processed_column = "is_processed"
-max_connections = 10
-include_metadata = true

Reply via email to