This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch postres_connector
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/postres_connector by this push:
new 2a18d33be Fix tests and postgres logic, add verbose flag
2a18d33be is described below
commit 2a18d33be1d1a6d4ecbdb71b5284261beb062c45
Author: spetz <[email protected]>
AuthorDate: Mon Jan 19 10:29:28 2026 +0100
Fix tests and postgres logic, add verbose flag
---
core/connectors/runtime/src/configs/connectors.rs | 10 +
.../src/configs/connectors/local_provider.rs | 32 +-
core/connectors/runtime/src/main.rs | 2 +
core/connectors/runtime/src/sink.rs | 36 +-
core/connectors/runtime/src/source.rs | 26 +-
core/connectors/sdk/src/lib.rs | 2 +
core/connectors/sinks/README.md | 2 +
.../sinks/postgres_sink/config.toml} | 12 +-
core/connectors/sinks/postgres_sink/src/lib.rs | 364 +++++++++++++--------
core/connectors/sources/README.md | 2 +
core/connectors/sources/postgres_source/README.md | 8 +-
.../sources/postgres_source/config.toml} | 17 +-
core/integration/tests/connectors/mod.rs | 12 +-
core/integration/tests/connectors/postgres/mod.rs | 187 +++++++----
.../tests/connectors/postgres/sink.toml | 2 +-
.../tests/connectors/postgres/sink_json.toml | 20 --
.../connectors/postgres/sink_json_config/sink.toml | 42 ---
.../tests/connectors/postgres/sink_raw.toml | 20 --
.../connectors/postgres/sink_raw_config/sink.toml | 42 ---
.../tests/connectors/postgres/source.toml | 2 +-
.../tests/connectors/postgres/source_bytea.toml | 20 --
.../postgres/source_bytea_config/source.toml | 40 ---
.../tests/connectors/postgres/source_delete.toml | 20 --
.../postgres/source_delete_config/source.toml | 41 ---
.../tests/connectors/postgres/source_json.toml | 20 --
.../postgres/source_json_config/source.toml | 40 ---
.../tests/connectors/postgres/source_mark.toml | 20 --
.../postgres/source_mark_config/source.toml | 41 ---
28 files changed, 466 insertions(+), 616 deletions(-)
diff --git a/core/connectors/runtime/src/configs/connectors.rs
b/core/connectors/runtime/src/configs/connectors.rs
index 6bacdc7e4..14dd0bdf8 100644
--- a/core/connectors/runtime/src/configs/connectors.rs
+++ b/core/connectors/runtime/src/configs/connectors.rs
@@ -80,6 +80,8 @@ pub struct CreateSinkConfig {
pub streams: Vec<StreamConsumerConfig>,
pub plugin_config_format: Option<ConfigFormat>,
pub plugin_config: Option<serde_json::Value>,
+ #[serde(default)]
+ pub verbose: bool,
}
impl CreateSinkConfig {
@@ -94,6 +96,7 @@ impl CreateSinkConfig {
streams: self.streams.clone(),
plugin_config_format: self.plugin_config_format,
plugin_config: self.plugin_config.clone(),
+ verbose: self.verbose,
}
}
}
@@ -109,6 +112,8 @@ pub struct SinkConfig {
pub streams: Vec<StreamConsumerConfig>,
pub plugin_config_format: Option<ConfigFormat>,
pub plugin_config: Option<serde_json::Value>,
+ #[serde(default)]
+ pub verbose: bool,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
@@ -120,6 +125,8 @@ pub struct CreateSourceConfig {
pub streams: Vec<StreamProducerConfig>,
pub plugin_config_format: Option<ConfigFormat>,
pub plugin_config: Option<serde_json::Value>,
+ #[serde(default)]
+ pub verbose: bool,
}
impl CreateSourceConfig {
@@ -134,6 +141,7 @@ impl CreateSourceConfig {
streams: self.streams.clone(),
plugin_config_format: self.plugin_config_format,
plugin_config: self.plugin_config.clone(),
+ verbose: self.verbose,
}
}
}
@@ -149,6 +157,8 @@ pub struct SourceConfig {
pub streams: Vec<StreamProducerConfig>,
pub plugin_config_format: Option<ConfigFormat>,
pub plugin_config: Option<serde_json::Value>,
+ #[serde(default)]
+ pub verbose: bool,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
diff --git a/core/connectors/runtime/src/configs/connectors/local_provider.rs
b/core/connectors/runtime/src/configs/connectors/local_provider.rs
index d3024e460..fb363d9c5 100644
--- a/core/connectors/runtime/src/configs/connectors/local_provider.rs
+++ b/core/connectors/runtime/src/configs/connectors/local_provider.rs
@@ -164,24 +164,36 @@ impl LocalConnectorsConfigProvider<Created> {
let sinks: DashMap<ConnectorId, SinkConfigFile> = DashMap::new();
let sources: DashMap<ConnectorId, SourceConfigFile> = DashMap::new();
- info!("Loading connectors configuration from: {}", self.config_dir);
+ let cwd = match std::env::current_dir() {
+ Ok(path) => path.display().to_string(),
+ Err(_) => "unknown".to_string(),
+ };
+ info!(
+ "Loading connectors configuration from: {}, current directory:
{cwd}",
+ self.config_dir
+ );
let entries = std::fs::read_dir(&self.config_dir)?;
for entry in entries.flatten() {
let path = entry.path();
if path.is_file() {
if path.extension().and_then(|ext| ext.to_str()) !=
Some("toml") {
- debug!("Skipping non-TOML file: {:?}", path);
+ debug!("Skipping non-TOML file: {}", path.display());
continue;
}
- if let Some(file_name) = path.file_name().and_then(|n|
n.to_str())
- && file_name.starts_with('.')
- {
- debug!("Skipping hidden file: {:?}", path);
- continue;
+ if let Some(file_name) = path.file_name().and_then(|n|
n.to_str()) {
+ if file_name.starts_with('.') {
+ debug!("Skipping hidden file: {}", path.display());
+ continue;
+ }
+ let file_name_lower = file_name.to_lowercase();
+ if file_name_lower == "cargo.toml" {
+ debug!("Skipping Cargo.toml: {}", path.display());
+ continue;
+ }
}
- debug!("Loading connector configuration from: {:?}", path);
+ info!("Loading connector configuration from: {}",
path.display());
let base_config = Self::read_base_config(&path)?;
debug!("Loaded base configuration: {:?}", base_config);
let path = path
@@ -229,8 +241,8 @@ impl LocalConnectorsConfigProvider<Created> {
}
}
- debug!(
- "Loaded connector configuration with key {}, version {},
created at {}",
+ info!(
+ "Loaded connector configuration with key: {}, version: {},
created at {}",
base_config.key(),
version,
created_at.to_rfc3339()
diff --git a/core/connectors/runtime/src/main.rs
b/core/connectors/runtime/src/main.rs
index da0ac4214..d600fc7eb 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -276,6 +276,7 @@ struct SinkConnectorPlugin {
config_format: Option<ConfigFormat>,
consumers: Vec<SinkConnectorConsumer>,
error: Option<String>,
+ verbose: bool,
}
struct SinkConnectorConsumer {
@@ -310,6 +311,7 @@ struct SourceConnectorPlugin {
producer: Option<SourceConnectorProducer>,
state_storage: StateStorage,
error: Option<String>,
+ verbose: bool,
}
struct SourceConnectorProducer {
diff --git a/core/connectors/runtime/src/sink.rs
b/core/connectors/runtime/src/sink.rs
index abd04971e..c9d8bd3da 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -40,7 +40,7 @@ use std::{
sync::{Arc, atomic::Ordering},
time::Instant,
};
-use tracing::{error, info, warn};
+use tracing::{debug, error, info, warn};
pub async fn init(
sink_configs: HashMap<String, SinkConfig>,
@@ -78,6 +78,7 @@ pub async fn init(
config_format: config.plugin_config_format,
consumers: vec![],
error: init_error.clone(),
+ verbose: config.verbose,
});
} else {
let container: Container<SinkApi> =
@@ -102,6 +103,7 @@ pub async fn init(
config_format: config.plugin_config_format,
consumers: vec![],
error: init_error.clone(),
+ verbose: config.verbose,
}],
},
);
@@ -205,6 +207,7 @@ pub fn consume(sinks: Vec<SinkConnectorWrapper>, context:
Arc<RuntimeContext>) {
sink.callback,
consumer.transforms,
consumer.consumer,
+ plugin.verbose,
)
.await
{
@@ -233,6 +236,7 @@ async fn consume_messages(
consume: ConsumeCallback,
transforms: Vec<Arc<dyn Transform>>,
mut consumer: IggyConsumer,
+ verbose: bool,
) -> Result<(), RuntimeError> {
info!("Started consuming messages for sink connector with ID:
{plugin_id}");
let batch_size = batch_size as usize;
@@ -263,10 +267,17 @@ async fn consume_messages(
current_offset,
schema: decoder.schema(),
};
- info!(
- "Processing {messages_count} messages for sink connector with ID:
{}",
- plugin_id
- );
+ if verbose {
+ info!(
+ "Processing {messages_count} messages for sink connector with
ID: {}",
+ plugin_id
+ );
+ } else {
+ debug!(
+ "Processing {messages_count} messages for sink connector with
ID: {}",
+ plugin_id
+ );
+ }
let start = Instant::now();
if let Err(error) = process_messages(
plugin_id,
@@ -286,10 +297,17 @@ async fn consume_messages(
}
let elapsed = start.elapsed();
- info!(
- "Consumed {messages_count} messages in {:#?} for sink connector
with ID: {plugin_id}",
- elapsed
- );
+ if verbose {
+ info!(
+ "Consumed {messages_count} messages in {:#?} for sink
connector with ID: {plugin_id}",
+ elapsed
+ );
+ } else {
+ debug!(
+ "Consumed {messages_count} messages in {:#?} for sink
connector with ID: {plugin_id}",
+ elapsed
+ );
+ }
}
info!("Stopped consuming messages for sink connector with ID:
{plugin_id}");
Ok(())
diff --git a/core/connectors/runtime/src/source.rs
b/core/connectors/runtime/src/source.rs
index 26af0eaff..5d6d88744 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -91,6 +91,7 @@ pub async fn init(
transforms: vec![],
state_storage,
error: init_error.clone(),
+ verbose: config.verbose,
});
} else {
let container: Container<SourceApi> =
@@ -118,6 +119,7 @@ pub async fn init(
transforms: vec![],
state_storage,
error: init_error.clone(),
+ verbose: config.verbose,
}],
},
);
@@ -267,7 +269,11 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>,
context: Arc<RuntimeContext>
while let Ok(produced_messages) = receiver.recv_async().await {
let count = produced_messages.messages.len();
- info!("Source connector with ID: {plugin_id} received
{count} messages",);
+ if plugin.verbose {
+ info!("Source connector with ID: {plugin_id} received
{count} messages",);
+ } else {
+ debug!("Source connector with ID: {plugin_id} received
{count} messages",);
+ }
let schema = produced_messages.schema;
let mut messages: Vec<DecodedMessage> =
Vec::with_capacity(count);
for message in produced_messages.messages {
@@ -322,11 +328,19 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>,
context: Arc<RuntimeContext>
continue;
}
- info!(
- "Sent {count} messages to stream: {}, topic: {} by
source connector with ID: {plugin_id}",
- producer.stream(),
- producer.topic()
- );
+ if plugin.verbose {
+ info!(
+ "Sent {count} messages to stream: {}, topic: {} by
source connector with ID: {plugin_id}",
+ producer.stream(),
+ producer.topic()
+ );
+ } else {
+ debug!(
+ "Sent {count} messages to stream: {}, topic: {} by
source connector with ID: {plugin_id}",
+ producer.stream(),
+ producer.topic()
+ );
+ }
let Some(state) = produced_messages.state else {
debug!("No state provided for source connector with
ID: {plugin_id}");
diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs
index 98066f33d..b91ade3dc 100644
--- a/core/connectors/sdk/src/lib.rs
+++ b/core/connectors/sdk/src/lib.rs
@@ -324,4 +324,6 @@ pub enum Error {
InvalidState,
#[error("Connection error: {0}")]
Connection(String),
+ #[error("Cannot store data: {0}")]
+ CannotStoreData(String),
}
diff --git a/core/connectors/sinks/README.md b/core/connectors/sinks/README.md
index f29b4b92f..a2782c8e6 100644
--- a/core/connectors/sinks/README.md
+++ b/core/connectors/sinks/README.md
@@ -40,6 +40,7 @@ pub struct SinkConfig {
pub streams: Vec<StreamConsumerConfig>,
pub plugin_config_format: Option<ConfigFormat>,
pub plugin_config: Option<serde_json::Value>,
+ pub verbose: bool, // Log message processing at info level instead of
debug (default: false)
}
```
@@ -64,6 +65,7 @@ version = 0
name = "Stdout sink"
path = "target/release/libiggy_connector_stdout_sink"
plugin_config_format = "toml"
+verbose = false # Log message processing at info level instead of debug
# Collection of the streams from which messages are consumed
[[streams]]
diff --git
a/core/integration/tests/connectors/postgres/connectors_config/sink.toml
b/core/connectors/sinks/postgres_sink/config.toml
similarity index 81%
rename from
core/integration/tests/connectors/postgres/connectors_config/sink.toml
rename to core/connectors/sinks/postgres_sink/config.toml
index 7476a2126..84a79def5 100644
--- a/core/integration/tests/connectors/postgres/connectors_config/sink.toml
+++ b/core/connectors/sinks/postgres_sink/config.toml
@@ -20,18 +20,19 @@ key = "postgres"
enabled = true
version = 0
name = "Postgres sink"
-path = "../../target/debug/libiggy_connector_postgres_sink"
+path = "../../../target/debug/libiggy_connector_postgres_sink"
+verbose = false
[[streams]]
-stream = "test_stream"
-topics = ["test_topic"]
+stream = "user_events"
+topics = ["users", "orders"]
schema = "json"
batch_length = 100
poll_interval = "5ms"
-consumer_group = "test"
+consumer_group = "postgres_sink"
[plugin_config]
-connection_string = ""
+connection_string = "postgresql://user:pass@localhost:5432/database"
target_table = "iggy_messages"
batch_size = 100
max_connections = 10
@@ -39,3 +40,4 @@ auto_create_table = true
include_metadata = true
include_checksum = true
include_origin_timestamp = true
+payload_format = "bytea"
diff --git a/core/connectors/sinks/postgres_sink/src/lib.rs
b/core/connectors/sinks/postgres_sink/src/lib.rs
index 2acfb7a6a..03757b729 100644
--- a/core/connectors/sinks/postgres_sink/src/lib.rs
+++ b/core/connectors/sinks/postgres_sink/src/lib.rs
@@ -180,12 +180,13 @@ impl PostgresSink {
let pool = self.get_pool()?;
let table_name = &self.config.target_table;
+ let quoted_table = quote_identifier(table_name)?;
let include_metadata = self.config.include_metadata.unwrap_or(true);
let include_checksum = self.config.include_checksum.unwrap_or(true);
let include_origin_timestamp =
self.config.include_origin_timestamp.unwrap_or(true);
let payload_type = self.payload_format().sql_type();
- let mut sql = format!("CREATE TABLE IF NOT EXISTS {table_name} (");
+ let mut sql = format!("CREATE TABLE IF NOT EXISTS {quoted_table} (");
sql.push_str("id DECIMAL(39, 0) PRIMARY KEY");
if include_metadata {
@@ -263,110 +264,160 @@ impl PostgresSink {
messages_metadata: &MessagesMetadata,
pool: &Pool<Postgres>,
) -> Result<(), Error> {
+ if messages.is_empty() {
+ return Ok(());
+ }
+
let table_name = &self.config.target_table;
let include_metadata = self.config.include_metadata.unwrap_or(true);
let include_checksum = self.config.include_checksum.unwrap_or(true);
let include_origin_timestamp =
self.config.include_origin_timestamp.unwrap_or(true);
let payload_format = self.payload_format();
- for message in messages {
- let payload_bytes =
message.payload.clone().try_into_vec().map_err(|e| {
- error!("Failed to convert payload to bytes: {e}");
- Error::InvalidRecord
- })?;
-
- let json_value = self.parse_json_payload(&payload_bytes,
payload_format)?;
- let text_value = self.parse_text_payload(&payload_bytes,
payload_format)?;
-
- let (query, _) = self.build_insert_query(
- table_name,
- include_metadata,
- include_checksum,
- include_origin_timestamp,
- );
-
- let timestamp = self.parse_timestamp(message.timestamp);
- let origin_timestamp =
self.parse_timestamp(message.origin_timestamp);
-
- self.execute_insert_with_retry(
- pool,
- &query,
- message,
- topic_metadata,
- messages_metadata,
- include_metadata,
- include_checksum,
- include_origin_timestamp,
- timestamp,
- origin_timestamp,
- payload_format,
- &payload_bytes,
- &json_value,
- &text_value,
- )
- .await?;
- }
-
- Ok(())
+ let (query, _params_per_row) = self.build_batch_insert_query(
+ table_name,
+ include_metadata,
+ include_checksum,
+ include_origin_timestamp,
+ messages.len(),
+ )?;
+
+ self.execute_batch_insert_with_retry(
+ pool,
+ &query,
+ messages,
+ topic_metadata,
+ messages_metadata,
+ include_metadata,
+ include_checksum,
+ include_origin_timestamp,
+ payload_format,
+ )
+ .await
}
#[allow(clippy::too_many_arguments)]
- async fn execute_insert_with_retry(
+ async fn execute_batch_insert_with_retry(
&self,
pool: &Pool<Postgres>,
query: &str,
- message: &ConsumedMessage,
+ messages: &[ConsumedMessage],
topic_metadata: &TopicMetadata,
messages_metadata: &MessagesMetadata,
include_metadata: bool,
include_checksum: bool,
include_origin_timestamp: bool,
- timestamp: DateTime<Utc>,
- origin_timestamp: DateTime<Utc>,
payload_format: PayloadFormat,
- payload_bytes: &[u8],
- json_value: &Option<serde_json::Value>,
- text_value: &Option<String>,
) -> Result<(), Error> {
let max_retries = self.get_max_retries();
let retry_delay = self.get_retry_delay();
let mut attempts = 0u32;
loop {
- let mut query_obj =
sqlx::query(query).bind(message.id.to_string());
+ let result = self
+ .bind_and_execute_batch(
+ pool,
+ query,
+ messages,
+ topic_metadata,
+ messages_metadata,
+ include_metadata,
+ include_checksum,
+ include_origin_timestamp,
+ payload_format,
+ )
+ .await;
+
+ match result {
+ Ok(_) => return Ok(()),
+ Err((e, is_transient)) => {
+ attempts += 1;
+ if !is_transient || attempts >= max_retries {
+ error!("Batch insert failed after {attempts} attempts:
{e}");
+ return Err(Error::CannotStoreData(format!(
+ "Batch insert failed after {attempts} attempts:
{e}"
+ )));
+ }
+ warn!(
+ "Transient database error (attempt
{attempts}/{max_retries}): {e}. Retrying..."
+ );
+ tokio::time::sleep(retry_delay * attempts).await;
+ }
+ }
+ }
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ async fn bind_and_execute_batch(
+ &self,
+ pool: &Pool<Postgres>,
+ query: &str,
+ messages: &[ConsumedMessage],
+ topic_metadata: &TopicMetadata,
+ messages_metadata: &MessagesMetadata,
+ include_metadata: bool,
+ include_checksum: bool,
+ include_origin_timestamp: bool,
+ payload_format: PayloadFormat,
+ ) -> Result<(), (sqlx::Error, bool)> {
+ let mut query_builder = sqlx::query(query);
+
+ for message in messages {
+ let payload_bytes =
message.payload.clone().try_into_vec().map_err(|e| {
+ let err_msg = format!("Failed to convert payload to bytes:
{e}");
+ (sqlx::Error::Protocol(err_msg), false)
+ })?;
+
+ let timestamp = self.parse_timestamp(message.timestamp);
+ let origin_timestamp_val =
self.parse_timestamp(message.origin_timestamp);
+
+ query_builder = query_builder.bind(message.id.to_string());
if include_metadata {
- query_obj = query_obj
+ query_builder = query_builder
.bind(message.offset as i64)
.bind(timestamp)
- .bind(&topic_metadata.stream)
- .bind(&topic_metadata.topic)
+ .bind(topic_metadata.stream.clone())
+ .bind(topic_metadata.topic.clone())
.bind(messages_metadata.partition_id as i32);
}
if include_checksum {
- query_obj = query_obj.bind(message.checksum as i64);
+ query_builder = query_builder.bind(message.checksum as i64);
}
if include_origin_timestamp {
- query_obj = query_obj.bind(origin_timestamp);
+ query_builder = query_builder.bind(origin_timestamp_val);
}
- query_obj = match payload_format {
- PayloadFormat::Bytea => query_obj.bind(payload_bytes.to_vec()),
- PayloadFormat::Json => query_obj.bind(json_value.clone()),
- PayloadFormat::Text => query_obj.bind(text_value.clone()),
- };
-
- match query_obj.execute(pool).await {
- Ok(_) => return Ok(()),
- Err(e) => {
- attempts += 1;
- handle_retry_error(e, attempts, max_retries)?;
- tokio::time::sleep(retry_delay * attempts).await;
+ query_builder = match payload_format {
+ PayloadFormat::Bytea => query_builder.bind(payload_bytes),
+ PayloadFormat::Json => {
+ let json_value: serde_json::Value =
serde_json::from_slice(&payload_bytes)
+ .map_err(|e| {
+ let err_msg = format!("Failed to parse payload as
JSON: {e}");
+ error!("{err_msg}");
+ (sqlx::Error::Protocol(err_msg), false)
+ })?;
+ query_builder.bind(json_value)
}
- }
+ PayloadFormat::Text => {
+ let text_value =
String::from_utf8(payload_bytes).map_err(|e| {
+ let err_msg = format!("Failed to parse payload as
UTF-8 text: {e}");
+ error!("{err_msg}");
+ (sqlx::Error::Protocol(err_msg), false)
+ })?;
+ query_builder.bind(text_value)
+ }
+ };
}
+
+ query_builder.execute(pool).await.map_err(|e| {
+ let is_transient = is_transient_error(&e);
+ (e, is_transient)
+ })?;
+
+ Ok(())
}
fn get_pool(&self) -> Result<&Pool<Postgres>, Error> {
@@ -402,90 +453,56 @@ impl PostgresSink {
.unwrap_or_else(Utc::now)
}
- fn parse_json_payload(
- &self,
- payload_bytes: &[u8],
- format: PayloadFormat,
- ) -> Result<Option<serde_json::Value>, Error> {
- match format {
- PayloadFormat::Json => {
- let value = serde_json::from_slice(payload_bytes).map_err(|e| {
- error!("Failed to parse payload as JSON: {e}");
- Error::InvalidRecord
- })?;
- Ok(Some(value))
- }
- _ => Ok(None),
- }
- }
-
- fn parse_text_payload(
- &self,
- payload_bytes: &[u8],
- format: PayloadFormat,
- ) -> Result<Option<String>, Error> {
- match format {
- PayloadFormat::Text => {
- let value =
String::from_utf8(payload_bytes.to_vec()).map_err(|e| {
- error!("Failed to parse payload as UTF-8 text: {e}");
- Error::InvalidRecord
- })?;
- Ok(Some(value))
- }
- _ => Ok(None),
- }
- }
-
- fn build_insert_query(
+ fn build_batch_insert_query(
&self,
table_name: &str,
include_metadata: bool,
include_checksum: bool,
include_origin_timestamp: bool,
- ) -> (String, u32) {
- let mut query = format!("INSERT INTO {table_name} (id");
- let mut values = "($1::numeric".to_string();
- let mut param_count = 1;
+ row_count: usize,
+ ) -> Result<(String, u32), Error> {
+ let quoted_table = quote_identifier(table_name)?;
+ let mut query = format!("INSERT INTO {quoted_table} (id");
+
+ let mut params_per_row: u32 = 1; // id
if include_metadata {
query.push_str(
", iggy_offset, iggy_timestamp, iggy_stream, iggy_topic,
iggy_partition_id",
);
- for i in 2..=6 {
- values.push_str(&format!(", ${i}"));
- }
- param_count = 6;
+ params_per_row += 5;
}
if include_checksum {
- param_count += 1;
query.push_str(", iggy_checksum");
- values.push_str(&format!(", ${param_count}"));
+ params_per_row += 1;
}
if include_origin_timestamp {
- param_count += 1;
query.push_str(", iggy_origin_timestamp");
- values.push_str(&format!(", ${param_count}"));
+ params_per_row += 1;
}
- param_count += 1;
query.push_str(", payload");
- values.push_str(&format!(", ${param_count}"));
+ params_per_row += 1;
- query.push_str(&format!(") VALUES {values})"));
+ query.push_str(") VALUES ");
- (query, param_count)
- }
-}
+ let mut value_groups = Vec::with_capacity(row_count);
+ for row_idx in 0..row_count {
+ let base_param = (row_idx as u32) * params_per_row;
+ let mut values = format!("(${}::numeric", base_param + 1);
+ for i in 2..=params_per_row {
+ values.push_str(&format!(", ${}", base_param + i));
+ }
+ values.push(')');
+ value_groups.push(values);
+ }
-fn handle_retry_error(e: sqlx::Error, attempts: u32, max_retries: u32) ->
Result<bool, Error> {
- if attempts >= max_retries || !is_transient_error(&e) {
- error!("Database operation failed after {attempts} attempts: {e}");
- return Err(Error::InvalidRecord);
+ query.push_str(&value_groups.join(", "));
+
+ Ok((query, params_per_row))
}
- warn!("Transient database error (attempt {attempts}/{max_retries}): {e}.
Retrying...");
- Ok(true)
}
fn is_transient_error(e: &sqlx::Error) -> bool {
@@ -504,6 +521,19 @@ fn is_transient_error(e: &sqlx::Error) -> bool {
}
}
+fn quote_identifier(name: &str) -> Result<String, Error> {
+ if name.is_empty() {
+ return Err(Error::InitError("Table name cannot be empty".to_string()));
+ }
+ if name.contains('\0') {
+ return Err(Error::InitError(
+ "Table name cannot contain null characters".to_string(),
+ ));
+ }
+ let escaped = name.replace('"', "\"\"");
+ Ok(format!("\"{escaped}\""))
+}
+
fn redact_connection_string(conn_str: &str) -> String {
if let Some(scheme_end) = conn_str.find("://") {
let scheme = &conn_str[..scheme_end + 3];
@@ -587,9 +617,11 @@ mod tests {
#[test]
fn given_all_options_enabled_should_build_full_insert_query() {
let sink = PostgresSink::new(1, test_config());
- let (query, param_count) = sink.build_insert_query("messages", true,
true, true);
+ let (query, param_count) = sink
+ .build_batch_insert_query("messages", true, true, true, 1)
+ .expect("Failed to build query");
- assert!(query.contains("INSERT INTO messages"));
+ assert!(query.contains("INSERT INTO \"messages\""));
assert!(query.contains("iggy_offset"));
assert!(query.contains("iggy_timestamp"));
assert!(query.contains("iggy_stream"));
@@ -604,9 +636,11 @@ mod tests {
#[test]
fn given_metadata_disabled_should_build_minimal_insert_query() {
let sink = PostgresSink::new(1, test_config());
- let (query, param_count) = sink.build_insert_query("messages", false,
false, false);
+ let (query, param_count) = sink
+ .build_batch_insert_query("messages", false, false, false, 1)
+ .expect("Failed to build query");
- assert!(query.contains("INSERT INTO messages"));
+ assert!(query.contains("INSERT INTO \"messages\""));
assert!(!query.contains("iggy_offset"));
assert!(!query.contains("iggy_checksum"));
assert!(!query.contains("iggy_origin_timestamp"));
@@ -617,7 +651,9 @@ mod tests {
#[test]
fn given_only_checksum_enabled_should_include_checksum() {
let sink = PostgresSink::new(1, test_config());
- let (query, param_count) = sink.build_insert_query("messages", false,
true, false);
+ let (query, param_count) = sink
+ .build_batch_insert_query("messages", false, true, false, 1)
+ .expect("Failed to build query");
assert!(!query.contains("iggy_offset"));
assert!(query.contains("iggy_checksum"));
@@ -696,4 +732,72 @@ mod tests {
let redacted = redact_connection_string(conn);
assert_eq!(redacted, "postgresql://adm***");
}
+
+ #[test]
+ fn given_special_chars_in_identifier_should_escape() {
+ let result = quote_identifier("table\"name").expect("Failed to quote");
+ assert_eq!(result, "\"table\"\"name\"");
+ }
+
+ #[test]
+ fn given_empty_identifier_should_fail() {
+ let result = quote_identifier("");
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn given_null_char_in_identifier_should_fail() {
+ let result = quote_identifier("table\0name");
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn given_normal_identifier_should_quote() {
+ let result = quote_identifier("my_table").expect("Failed to quote");
+ assert_eq!(result, "\"my_table\"");
+ }
+
+ #[test]
+ fn given_identifier_with_spaces_should_quote() {
+ let result = quote_identifier("my table").expect("Failed to quote");
+ assert_eq!(result, "\"my table\"");
+ }
+
+ #[test]
+ fn given_identifier_with_sql_injection_should_escape() {
+ let result = quote_identifier("messages\"; DROP TABLE users;
--").expect("Failed to quote");
+ assert_eq!(result, "\"messages\"\"; DROP TABLE users; --\"");
+ }
+
+ #[test]
+ fn given_batch_of_3_rows_should_build_multi_row_insert_query() {
+ let sink = PostgresSink::new(1, test_config());
+ let (query, params_per_row) = sink
+ .build_batch_insert_query("messages", true, true, true, 3)
+ .expect("Failed to build batch query");
+
+ // With all options: id + 5 metadata + checksum + origin_timestamp +
payload = 9 params per row
+ assert_eq!(params_per_row, 9);
+
+ // Should have 3 value groups
+ assert!(query.contains("($1::numeric, $2, $3, $4, $5, $6, $7, $8,
$9)"));
+ assert!(query.contains("($10::numeric, $11, $12, $13, $14, $15, $16,
$17, $18)"));
+ assert!(query.contains("($19::numeric, $20, $21, $22, $23, $24, $25,
$26, $27)"));
+ }
+
+ #[test]
+ fn given_batch_of_2_rows_minimal_should_build_correct_query() {
+ let sink = PostgresSink::new(1, test_config());
+ let (query, params_per_row) = sink
+ .build_batch_insert_query("messages", false, false, false, 2)
+ .expect("Failed to build batch query");
+
+ // With minimal options: id + payload = 2 params per row
+ assert_eq!(params_per_row, 2);
+
+ // Should have 2 value groups
+ assert!(query.contains("($1::numeric, $2)"));
+ assert!(query.contains("($3::numeric, $4)"));
+ assert!(!query.contains("$5"));
+ }
}
diff --git a/core/connectors/sources/README.md
b/core/connectors/sources/README.md
index 4bdb8e985..90c453545 100644
--- a/core/connectors/sources/README.md
+++ b/core/connectors/sources/README.md
@@ -35,6 +35,7 @@ pub struct SourceConfig {
pub streams: Vec<StreamProducerConfig>,
pub plugin_config_format: Option<ConfigFormat>,
pub plugin_config: Option<serde_json::Value>,
+ pub verbose: bool, // Log message processing at info level instead of
debug (default: false)
}
```
@@ -59,6 +60,7 @@ version = 0
name = "Random source" # Name of the source
path = "libiggy_connector_random_source" # Path to the source connector
config_format = "toml"
+verbose = false # Log message processing at info level instead of debug
# Collection of the streams to which the produced messages are sent
[[streams]]
diff --git a/core/connectors/sources/postgres_source/README.md
b/core/connectors/sources/postgres_source/README.md
index 0656f4ef5..000f3fb1f 100644
--- a/core/connectors/sources/postgres_source/README.md
+++ b/core/connectors/sources/postgres_source/README.md
@@ -21,7 +21,7 @@ The PostgreSQL source connector fetches data from PostgreSQL
databases and strea
connection_string = "postgresql://user:pass@localhost:5432/database"
mode = "polling"
tables = ["users", "orders"]
-poll_interval = "30s"
+poll_interval = "1s"
batch_size = 1000
tracking_column = "id"
initial_offset = "0"
@@ -56,7 +56,7 @@ cdc_backend = "builtin"
| `connection_string` | string | required | PostgreSQL connection string |
| `mode` | string | required | `polling` or `cdc` |
| `tables` | array | required | List of tables to monitor |
-| `poll_interval` | string | `10s` | How often to poll (e.g., `30s`, `5m`) |
+| `poll_interval` | string | `1s` | How often to poll (e.g., `1s`, `5m`) |
| `batch_size` | u32 | `1000` | Max rows per poll |
| `tracking_column` | string | `id` | Column for incremental updates |
| `initial_offset` | string | none | Starting value for tracking column |
@@ -289,7 +289,7 @@ batch_length = 100
connection_string = "postgresql://user:pass@localhost:5432/mydb"
mode = "polling"
tables = ["users"]
-poll_interval = "10s"
+poll_interval = "1s"
tracking_column = "updated_at"
```
@@ -326,7 +326,7 @@ batch_length = 100
connection_string = "postgresql://user:pass@localhost:5432/mydb"
mode = "polling"
tables = ["events"]
-poll_interval = "10s"
+poll_interval = "1s"
tracking_column = "id"
payload_column = "data"
payload_format = "json_direct"
diff --git
a/core/integration/tests/connectors/postgres/source_config/source.toml
b/core/connectors/sources/postgres_source/config.toml
similarity index 76%
rename from core/integration/tests/connectors/postgres/source_config/source.toml
rename to core/connectors/sources/postgres_source/config.toml
index ac5a36e8c..be53647af 100644
--- a/core/integration/tests/connectors/postgres/source_config/source.toml
+++ b/core/connectors/sources/postgres_source/config.toml
@@ -20,20 +20,23 @@ key = "postgres"
enabled = true
version = 0
name = "Postgres source"
-path = "../../target/debug/libiggy_connector_postgres_source"
+path = "../../../target/debug/libiggy_connector_postgres_source"
+verbose = false
[[streams]]
-stream = "test_stream"
-topic = "test_topic"
+stream = "user_events"
+topic = "users"
schema = "json"
batch_length = 100
[plugin_config]
-connection_string = ""
+connection_string = "postgresql://user:pass@localhost:5432/database"
mode = "polling"
-tables = ["test_messages"]
-poll_interval = "5ms"
-batch_size = 100
+tables = ["users", "orders"]
+poll_interval = "1s"
+batch_size = 1000
tracking_column = "id"
+initial_offset = "0"
max_connections = 10
+snake_case_columns = false
include_metadata = true
diff --git a/core/integration/tests/connectors/mod.rs
b/core/integration/tests/connectors/mod.rs
index 7f468c35e..ffbbf0dca 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -79,14 +79,14 @@ struct ConnectorsIggyClient {
fn create_test_messages(count: usize) -> Vec<TestMessage> {
let base_timestamp = IggyTimestamp::now().as_micros();
- (0..count)
+ (1..=count)
.map(|i| TestMessage {
id: i as u64,
- name: format!("user_{i}"),
- count: (i * 10) as u32,
- amount: i as f64 * 99.99,
- active: i % 2 == 0,
- timestamp: (base_timestamp + i as u64 * ONE_DAY_MICROS) as i64,
+ name: format!("user_{}", i - 1),
+ count: ((i - 1) * 10) as u32,
+ amount: (i - 1) as f64 * 99.99,
+ active: (i - 1) % 2 == 0,
+ timestamp: (base_timestamp + (i - 1) as u64 * ONE_DAY_MICROS) as
i64,
})
.collect()
}
diff --git a/core/integration/tests/connectors/postgres/mod.rs
b/core/integration/tests/connectors/postgres/mod.rs
index ec4be1c00..e6e41bbb2 100644
--- a/core/integration/tests/connectors/postgres/mod.rs
+++ b/core/integration/tests/connectors/postgres/mod.rs
@@ -51,8 +51,37 @@ const FETCH_INTERVAL_MS: u64 = 10;
const TABLE_WAIT_INTERVAL_MS: u64 = 50;
const ENV_SINK_CONNECTION_STRING: &str =
"IGGY_CONNECTORS_SINK_POSTGRES_PLUGIN_CONFIG_CONNECTION_STRING";
+const ENV_SINK_TARGET_TABLE: &str =
"IGGY_CONNECTORS_SINK_POSTGRES_PLUGIN_CONFIG_TARGET_TABLE";
+const ENV_SINK_PAYLOAD_FORMAT: &str =
"IGGY_CONNECTORS_SINK_POSTGRES_PLUGIN_CONFIG_PAYLOAD_FORMAT";
+const ENV_SINK_STREAMS_0_STREAM: &str =
"IGGY_CONNECTORS_SINK_POSTGRES_STREAMS_0_STREAM";
+const ENV_SINK_STREAMS_0_TOPICS: &str =
"IGGY_CONNECTORS_SINK_POSTGRES_STREAMS_0_TOPICS";
+const ENV_SINK_STREAMS_0_SCHEMA: &str =
"IGGY_CONNECTORS_SINK_POSTGRES_STREAMS_0_SCHEMA";
+const ENV_SINK_STREAMS_0_CONSUMER_GROUP: &str =
+ "IGGY_CONNECTORS_SINK_POSTGRES_STREAMS_0_CONSUMER_GROUP";
const ENV_SOURCE_CONNECTION_STRING: &str =
"IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_CONNECTION_STRING";
+const ENV_SOURCE_TABLES: &str =
"IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_TABLES";
+const ENV_SOURCE_TRACKING_COLUMN: &str =
+ "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_TRACKING_COLUMN";
+const ENV_SOURCE_PAYLOAD_COLUMN: &str =
+ "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_PAYLOAD_COLUMN";
+const ENV_SOURCE_PAYLOAD_FORMAT: &str =
+ "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_PAYLOAD_FORMAT";
+const ENV_SOURCE_DELETE_AFTER_READ: &str =
+ "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_DELETE_AFTER_READ";
+const ENV_SOURCE_PRIMARY_KEY_COLUMN: &str =
+ "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_PRIMARY_KEY_COLUMN";
+const ENV_SOURCE_PROCESSED_COLUMN: &str =
+ "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_PROCESSED_COLUMN";
+const ENV_SOURCE_INCLUDE_METADATA: &str =
+ "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_INCLUDE_METADATA";
+const ENV_SOURCE_STREAMS_0_STREAM: &str =
"IGGY_CONNECTORS_SOURCE_POSTGRES_STREAMS_0_STREAM";
+const ENV_SOURCE_STREAMS_0_TOPIC: &str =
"IGGY_CONNECTORS_SOURCE_POSTGRES_STREAMS_0_TOPIC";
+const ENV_SOURCE_STREAMS_0_SCHEMA: &str =
"IGGY_CONNECTORS_SOURCE_POSTGRES_STREAMS_0_SCHEMA";
+const ENV_SOURCE_POLL_INTERVAL: &str =
+ "IGGY_CONNECTORS_SOURCE_POSTGRES_PLUGIN_CONFIG_POLL_INTERVAL";
+const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_POSTGRES_PATH";
+const ENV_SOURCE_PATH: &str = "IGGY_CONNECTORS_SOURCE_POSTGRES_PATH";
type SinkRow = (i64, String, String, Vec<u8>);
type SinkJsonRow = (i64, serde_json::Value);
@@ -93,29 +122,50 @@ async fn create_pool(connection_string: &str) ->
Pool<Postgres> {
}
async fn setup_sink() -> PostgresTestSetup {
- setup_sink_with_config("postgres/sink.toml").await
+ let mut envs = HashMap::new();
+ envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_owned(), "json".to_owned());
+ setup_sink_with_envs(envs).await
}
async fn setup_sink_bytea() -> PostgresTestSetup {
- setup_sink_with_config("postgres/sink_raw.toml").await
+ let mut envs = HashMap::new();
+ envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_owned(), "raw".to_owned());
+ envs.insert(ENV_SINK_PAYLOAD_FORMAT.to_owned(), "bytea".to_owned());
+ setup_sink_with_envs(envs).await
}
async fn setup_sink_json() -> PostgresTestSetup {
- setup_sink_with_config("postgres/sink_json.toml").await
+ let mut envs = HashMap::new();
+ envs.insert(ENV_SINK_STREAMS_0_SCHEMA.to_owned(), "json".to_owned());
+ envs.insert(ENV_SINK_PAYLOAD_FORMAT.to_owned(), "json".to_owned());
+ setup_sink_with_envs(envs).await
}
-async fn setup_sink_with_config(config_path: &str) -> PostgresTestSetup {
+async fn setup_sink_with_envs(mut extra_envs: HashMap<String, String>) ->
PostgresTestSetup {
let (container, connection_string) = start_container().await;
- let mut envs = HashMap::new();
- envs.insert(
+ extra_envs.insert(
ENV_SINK_CONNECTION_STRING.to_owned(),
connection_string.clone(),
);
+ extra_envs.insert(ENV_SINK_TARGET_TABLE.to_owned(), SINK_TABLE.to_owned());
+ extra_envs.insert(ENV_SINK_STREAMS_0_STREAM.to_owned(),
TEST_STREAM.to_owned());
+ extra_envs.insert(
+ ENV_SINK_STREAMS_0_TOPICS.to_owned(),
+ format!("[{TEST_TOPIC}]"),
+ );
+ extra_envs.insert(
+ ENV_SINK_STREAMS_0_CONSUMER_GROUP.to_owned(),
+ "test".to_owned(),
+ );
+ extra_envs.insert(
+ ENV_SINK_PATH.to_owned(),
+ "../../target/debug/libiggy_connector_postgres_sink".to_owned(),
+ );
let mut runtime = setup_runtime();
runtime
- .init(config_path, Some(envs), IggySetup::default())
+ .init("postgres/sink.toml", Some(extra_envs), IggySetup::default())
.await;
PostgresTestSetup {
@@ -145,21 +195,11 @@ async fn setup_source() -> PostgresTestSetup {
pool.close().await;
let mut envs = HashMap::new();
- envs.insert(
- ENV_SOURCE_CONNECTION_STRING.to_owned(),
- connection_string.clone(),
- );
+ envs.insert(ENV_SOURCE_TABLES.to_owned(), format!("[{SOURCE_TABLE}]"));
+ envs.insert(ENV_SOURCE_INCLUDE_METADATA.to_owned(), "true".to_owned());
+ envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_owned(), "json".to_owned());
- let mut runtime = setup_runtime();
- runtime
- .init("postgres/source.toml", Some(envs), IggySetup::default())
- .await;
-
- PostgresTestSetup {
- runtime,
- connection_string,
- container,
- }
+ setup_source_with_envs(container, connection_string, envs).await
}
async fn setup_source_bytea() -> PostgresTestSetup {
@@ -179,24 +219,14 @@ async fn setup_source_bytea() -> PostgresTestSetup {
let mut envs = HashMap::new();
envs.insert(
- ENV_SOURCE_CONNECTION_STRING.to_owned(),
- connection_string.clone(),
+ ENV_SOURCE_TABLES.to_owned(),
+ format!("[{SOURCE_TABLE_BYTEA}]"),
);
+ envs.insert(ENV_SOURCE_PAYLOAD_COLUMN.to_owned(), "payload".to_owned());
+ envs.insert(ENV_SOURCE_PAYLOAD_FORMAT.to_owned(), "bytea".to_owned());
+ envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_owned(), "raw".to_owned());
- let mut runtime = setup_runtime();
- runtime
- .init(
- "postgres/source_bytea.toml",
- Some(envs),
- IggySetup::default(),
- )
- .await;
-
- PostgresTestSetup {
- runtime,
- connection_string,
- container,
- }
+ setup_source_with_envs(container, connection_string, envs).await
}
async fn setup_source_json() -> PostgresTestSetup {
@@ -216,24 +246,17 @@ async fn setup_source_json() -> PostgresTestSetup {
let mut envs = HashMap::new();
envs.insert(
- ENV_SOURCE_CONNECTION_STRING.to_owned(),
- connection_string.clone(),
+ ENV_SOURCE_TABLES.to_owned(),
+ format!("[{SOURCE_TABLE_JSON}]"),
);
+ envs.insert(ENV_SOURCE_PAYLOAD_COLUMN.to_owned(), "data".to_owned());
+ envs.insert(
+ ENV_SOURCE_PAYLOAD_FORMAT.to_owned(),
+ "json_direct".to_owned(),
+ );
+ envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_owned(), "json".to_owned());
- let mut runtime = setup_runtime();
- runtime
- .init(
- "postgres/source_json.toml",
- Some(envs),
- IggySetup::default(),
- )
- .await;
-
- PostgresTestSetup {
- runtime,
- connection_string,
- container,
- }
+ setup_source_with_envs(container, connection_string, envs).await
}
async fn setup_source_delete() -> PostgresTestSetup {
@@ -254,24 +277,15 @@ async fn setup_source_delete() -> PostgresTestSetup {
let mut envs = HashMap::new();
envs.insert(
- ENV_SOURCE_CONNECTION_STRING.to_owned(),
- connection_string.clone(),
+ ENV_SOURCE_TABLES.to_owned(),
+ format!("[{SOURCE_TABLE_DELETE}]"),
);
+ envs.insert(ENV_SOURCE_PRIMARY_KEY_COLUMN.to_owned(), "id".to_owned());
+ envs.insert(ENV_SOURCE_DELETE_AFTER_READ.to_owned(), "true".to_owned());
+ envs.insert(ENV_SOURCE_INCLUDE_METADATA.to_owned(), "true".to_owned());
+ envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_owned(), "json".to_owned());
- let mut runtime = setup_runtime();
- runtime
- .init(
- "postgres/source_delete.toml",
- Some(envs),
- IggySetup::default(),
- )
- .await;
-
- PostgresTestSetup {
- runtime,
- connection_string,
- container,
- }
+ setup_source_with_envs(container, connection_string, envs).await
}
async fn setup_source_mark() -> PostgresTestSetup {
@@ -293,15 +307,46 @@ async fn setup_source_mark() -> PostgresTestSetup {
let mut envs = HashMap::new();
envs.insert(
+ ENV_SOURCE_TABLES.to_owned(),
+ format!("[{SOURCE_TABLE_MARK}]"),
+ );
+ envs.insert(ENV_SOURCE_PRIMARY_KEY_COLUMN.to_owned(), "id".to_owned());
+ envs.insert(
+ ENV_SOURCE_PROCESSED_COLUMN.to_owned(),
+ "is_processed".to_owned(),
+ );
+ envs.insert(ENV_SOURCE_INCLUDE_METADATA.to_owned(), "true".to_owned());
+ envs.insert(ENV_SOURCE_STREAMS_0_SCHEMA.to_owned(), "json".to_owned());
+
+ setup_source_with_envs(container, connection_string, envs).await
+}
+
+async fn setup_source_with_envs(
+ container: ContainerAsync<postgres::Postgres>,
+ connection_string: String,
+ mut extra_envs: HashMap<String, String>,
+) -> PostgresTestSetup {
+ extra_envs.insert(
ENV_SOURCE_CONNECTION_STRING.to_owned(),
connection_string.clone(),
);
+ extra_envs.insert(ENV_SOURCE_TRACKING_COLUMN.to_owned(), "id".to_owned());
+ extra_envs.insert(
+ ENV_SOURCE_STREAMS_0_STREAM.to_owned(),
+ TEST_STREAM.to_owned(),
+ );
+ extra_envs.insert(ENV_SOURCE_STREAMS_0_TOPIC.to_owned(),
TEST_TOPIC.to_owned());
+ extra_envs.insert(ENV_SOURCE_POLL_INTERVAL.to_owned(), "10ms".to_owned());
+ extra_envs.insert(
+ ENV_SOURCE_PATH.to_owned(),
+ "../../target/debug/libiggy_connector_postgres_source".to_owned(),
+ );
let mut runtime = setup_runtime();
runtime
.init(
- "postgres/source_mark.toml",
- Some(envs),
+ "postgres/source.toml",
+ Some(extra_envs),
IggySetup::default(),
)
.await;
diff --git a/core/integration/tests/connectors/postgres/sink.toml
b/core/integration/tests/connectors/postgres/sink.toml
index 7336b47b1..61b3ddada 100644
--- a/core/integration/tests/connectors/postgres/sink.toml
+++ b/core/integration/tests/connectors/postgres/sink.toml
@@ -17,4 +17,4 @@
[connectors]
config_type = "local"
-config_dir = "tests/connectors/postgres/connectors_config"
+config_dir = "../connectors/sinks/postgres_sink"
diff --git a/core/integration/tests/connectors/postgres/sink_json.toml
b/core/integration/tests/connectors/postgres/sink_json.toml
deleted file mode 100644
index 093005684..000000000
--- a/core/integration/tests/connectors/postgres/sink_json.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[connectors]
-config_type = "local"
-config_dir = "tests/connectors/postgres/sink_json_config"
diff --git
a/core/integration/tests/connectors/postgres/sink_json_config/sink.toml
b/core/integration/tests/connectors/postgres/sink_json_config/sink.toml
deleted file mode 100644
index c55ff5b71..000000000
--- a/core/integration/tests/connectors/postgres/sink_json_config/sink.toml
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-type = "sink"
-key = "postgres"
-enabled = true
-version = 0
-name = "Postgres sink json"
-path = "../../target/debug/libiggy_connector_postgres_sink"
-
-[[streams]]
-stream = "test_stream"
-topics = ["test_topic"]
-schema = "json"
-batch_length = 100
-poll_interval = "5ms"
-consumer_group = "test"
-
-[plugin_config]
-connection_string = ""
-target_table = "iggy_messages"
-batch_size = 100
-max_connections = 10
-auto_create_table = true
-include_metadata = true
-include_checksum = true
-include_origin_timestamp = true
-payload_format = "json"
diff --git a/core/integration/tests/connectors/postgres/sink_raw.toml
b/core/integration/tests/connectors/postgres/sink_raw.toml
deleted file mode 100644
index da6ff86e5..000000000
--- a/core/integration/tests/connectors/postgres/sink_raw.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[connectors]
-config_type = "local"
-config_dir = "tests/connectors/postgres/sink_raw_config"
diff --git
a/core/integration/tests/connectors/postgres/sink_raw_config/sink.toml
b/core/integration/tests/connectors/postgres/sink_raw_config/sink.toml
deleted file mode 100644
index 82b7bf400..000000000
--- a/core/integration/tests/connectors/postgres/sink_raw_config/sink.toml
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-type = "sink"
-key = "postgres"
-enabled = true
-version = 0
-name = "Postgres sink raw"
-path = "../../target/debug/libiggy_connector_postgres_sink"
-
-[[streams]]
-stream = "test_stream"
-topics = ["test_topic"]
-schema = "raw"
-batch_length = 100
-poll_interval = "5ms"
-consumer_group = "test"
-
-[plugin_config]
-connection_string = ""
-target_table = "iggy_messages"
-batch_size = 100
-max_connections = 10
-auto_create_table = true
-include_metadata = true
-include_checksum = true
-include_origin_timestamp = true
-payload_format = "bytea"
diff --git a/core/integration/tests/connectors/postgres/source.toml
b/core/integration/tests/connectors/postgres/source.toml
index 2da29f3f6..049329c08 100644
--- a/core/integration/tests/connectors/postgres/source.toml
+++ b/core/integration/tests/connectors/postgres/source.toml
@@ -17,4 +17,4 @@
[connectors]
config_type = "local"
-config_dir = "tests/connectors/postgres/source_config"
+config_dir = "../connectors/sources/postgres_source"
diff --git a/core/integration/tests/connectors/postgres/source_bytea.toml
b/core/integration/tests/connectors/postgres/source_bytea.toml
deleted file mode 100644
index a815d1bae..000000000
--- a/core/integration/tests/connectors/postgres/source_bytea.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[connectors]
-config_type = "local"
-config_dir = "tests/connectors/postgres/source_bytea_config"
diff --git
a/core/integration/tests/connectors/postgres/source_bytea_config/source.toml
b/core/integration/tests/connectors/postgres/source_bytea_config/source.toml
deleted file mode 100644
index c117ee82f..000000000
--- a/core/integration/tests/connectors/postgres/source_bytea_config/source.toml
+++ /dev/null
@@ -1,40 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-type = "source"
-key = "postgres"
-enabled = true
-version = 0
-name = "Postgres source bytea"
-path = "../../target/debug/libiggy_connector_postgres_source"
-
-[[streams]]
-stream = "test_stream"
-topic = "test_topic"
-schema = "raw"
-batch_length = 100
-
-[plugin_config]
-connection_string = ""
-mode = "polling"
-tables = ["test_payloads"]
-poll_interval = "5ms"
-batch_size = 100
-tracking_column = "id"
-max_connections = 10
-payload_column = "payload"
-payload_format = "bytea"
diff --git a/core/integration/tests/connectors/postgres/source_delete.toml
b/core/integration/tests/connectors/postgres/source_delete.toml
deleted file mode 100644
index 52b7de053..000000000
--- a/core/integration/tests/connectors/postgres/source_delete.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[connectors]
-config_type = "local"
-config_dir = "tests/connectors/postgres/source_delete_config"
diff --git
a/core/integration/tests/connectors/postgres/source_delete_config/source.toml
b/core/integration/tests/connectors/postgres/source_delete_config/source.toml
deleted file mode 100644
index 250508a8f..000000000
---
a/core/integration/tests/connectors/postgres/source_delete_config/source.toml
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-type = "source"
-key = "postgres"
-enabled = true
-version = 0
-name = "Postgres source with delete after read"
-path = "../../target/debug/libiggy_connector_postgres_source"
-
-[[streams]]
-stream = "test_stream"
-topic = "test_topic"
-schema = "json"
-batch_length = 100
-
-[plugin_config]
-connection_string = ""
-mode = "polling"
-tables = ["test_delete_rows"]
-poll_interval = "5ms"
-batch_size = 100
-tracking_column = "id"
-primary_key_column = "id"
-delete_after_read = true
-max_connections = 10
-include_metadata = true
diff --git a/core/integration/tests/connectors/postgres/source_json.toml
b/core/integration/tests/connectors/postgres/source_json.toml
deleted file mode 100644
index fa6d44610..000000000
--- a/core/integration/tests/connectors/postgres/source_json.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[connectors]
-config_type = "local"
-config_dir = "tests/connectors/postgres/source_json_config"
diff --git
a/core/integration/tests/connectors/postgres/source_json_config/source.toml
b/core/integration/tests/connectors/postgres/source_json_config/source.toml
deleted file mode 100644
index 901776320..000000000
--- a/core/integration/tests/connectors/postgres/source_json_config/source.toml
+++ /dev/null
@@ -1,40 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-type = "source"
-key = "postgres"
-enabled = true
-version = 0
-name = "Postgres source json direct"
-path = "../../target/debug/libiggy_connector_postgres_source"
-
-[[streams]]
-stream = "test_stream"
-topic = "test_topic"
-schema = "json"
-batch_length = 100
-
-[plugin_config]
-connection_string = ""
-mode = "polling"
-tables = ["test_json_payloads"]
-poll_interval = "5ms"
-batch_size = 100
-tracking_column = "id"
-max_connections = 10
-payload_column = "data"
-payload_format = "json_direct"
diff --git a/core/integration/tests/connectors/postgres/source_mark.toml
b/core/integration/tests/connectors/postgres/source_mark.toml
deleted file mode 100644
index 24143ac08..000000000
--- a/core/integration/tests/connectors/postgres/source_mark.toml
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[connectors]
-config_type = "local"
-config_dir = "tests/connectors/postgres/source_mark_config"
diff --git
a/core/integration/tests/connectors/postgres/source_mark_config/source.toml
b/core/integration/tests/connectors/postgres/source_mark_config/source.toml
deleted file mode 100644
index 51571e07b..000000000
--- a/core/integration/tests/connectors/postgres/source_mark_config/source.toml
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-type = "source"
-key = "postgres"
-enabled = true
-version = 0
-name = "Postgres source with processed column"
-path = "../../target/debug/libiggy_connector_postgres_source"
-
-[[streams]]
-stream = "test_stream"
-topic = "test_topic"
-schema = "json"
-batch_length = 100
-
-[plugin_config]
-connection_string = ""
-mode = "polling"
-tables = ["test_mark_rows"]
-poll_interval = "5ms"
-batch_size = 100
-tracking_column = "id"
-primary_key_column = "id"
-processed_column = "is_processed"
-max_connections = 10
-include_metadata = true