hubcio commented on code in PR #2579:
URL: https://github.com/apache/iggy/pull/2579#discussion_r2703608731
##########
core/connectors/sinks/postgres_sink/src/lib.rs:
##########
@@ -104,43 +178,41 @@ impl PostgresSink {
return Ok(());
}
- let pool = self
- .pool
- .as_ref()
- .ok_or_else(|| Error::InitError("Database not
connected".to_string()))?;
+ let pool = self.get_pool()?;
let table_name = &self.config.target_table;
let include_metadata = self.config.include_metadata.unwrap_or(true);
let include_checksum = self.config.include_checksum.unwrap_or(true);
let include_origin_timestamp =
self.config.include_origin_timestamp.unwrap_or(true);
+ let payload_type = self.payload_format().sql_type();
- let mut create_table_sql = format!("CREATE TABLE IF NOT EXISTS
{table_name} (");
- create_table_sql.push_str("id DECIMAL(39, 0) PRIMARY KEY");
+ let mut sql = format!("CREATE TABLE IF NOT EXISTS {table_name} (");
Review Comment:
SQL inject is possible here, you need to use quote_identifier(), like you
did in source
##########
core/connectors/sinks/postgres_sink/src/lib.rs:
##########
@@ -254,55 +349,351 @@ impl PostgresSink {
}
if include_origin_timestamp {
- let origin_timestamp = DateTime::from_timestamp(
- (message.origin_timestamp / 1_000_000) as i64,
- ((message.origin_timestamp % 1_000_000) * 1_000) as u32,
- )
- .unwrap_or_else(Utc::now);
query_obj = query_obj.bind(origin_timestamp);
}
- query_obj = query_obj.bind(payload_bytes);
+ query_obj = match payload_format {
+ PayloadFormat::Bytea => query_obj.bind(payload_bytes.to_vec()),
+ PayloadFormat::Json => query_obj.bind(json_value.clone()),
+ PayloadFormat::Text => query_obj.bind(text_value.clone()),
+ };
+
+ match query_obj.execute(pool).await {
+ Ok(_) => return Ok(()),
+ Err(e) => {
+ attempts += 1;
+ handle_retry_error(e, attempts, max_retries)?;
+ tokio::time::sleep(retry_delay * attempts).await;
+ }
+ }
+ }
+ }
- query_obj.execute(pool).await.map_err(|e| {
- error!("Failed to insert message: {}", e);
- Error::InvalidRecord
- })?;
+ fn get_pool(&self) -> Result<&Pool<Postgres>, Error> {
+ self.pool
+ .as_ref()
+ .ok_or_else(|| Error::InitError("Database not
connected".to_string()))
+ }
+
+ fn payload_format(&self) -> PayloadFormat {
+ PayloadFormat::from_config(self.config.payload_format.as_deref())
+ }
+
+ fn get_max_retries(&self) -> u32 {
+ self.config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES)
+ }
+
+ fn get_retry_delay(&self) -> Duration {
+ let delay_str = self
+ .config
+ .retry_delay
+ .as_deref()
+ .unwrap_or(DEFAULT_RETRY_DELAY);
+ HumanDuration::from_str(delay_str)
+ .unwrap_or_else(|e| panic!("Invalid retry_delay '{delay_str}':
{e}"))
+ .into()
+ }
Review Comment:
plz dont panic, it'll crash whole connector maybe return invalid config.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]