spetz commented on code in PR #2191: URL: https://github.com/apache/iggy/pull/2191#discussion_r2372148591
########## core/connectors/sinks/iceberg_sink/iggy_iceberg_sink_config.toml: ########## @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +[http_api] # Optional HTTP API configuration +enabled = true +address = "127.0.0.1:8081" +# api_key = "secret" # Optional API key for authentication to be passed as `api-key` header + +[http_api.cors] # Optional CORS configuration for HTTP API +enabled = false +allowed_methods = ["GET", "POST", "PUT", "DELETE"] +allowed_origins = ["*"] +allowed_headers = ["content-type"] +exposed_headers = [""] +allow_credentials = false +allow_private_network = false + +[http_api.tls] # Optional TLS configuration for HTTP API +enabled = false +cert_file = "core/certs/iggy_cert.pem" +key_file = "core/certs/iggy_key.pem" + +[iggy] +address = "localhost:8090" +username = "iggy" +password = "iggy" +# token = "secret" # Personal Access Token (PAT) can be used instead of username and password + +[state] +path = "local_state" + +[sinks.iceberg] +enabled = true +name = "Iceberg sink" +path = "target/release/libiggy_connector_iceberg_sink" + +[[sinks.iceberg.streams]] +stream = "example_stream" +topics = ["example_topic"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "iceberg_sink_connector" + +# Local S3 example +[sinks.iceberg.config] +tables = ["nyc.users"] +catalog_type = "rest" +warehouse = "warehouse" +uri = "http://localhost:8181" +dynamic_routing = true +dynamic_route_field = "db_table" +store_url = "http://localhost:9000" +store_access_key_id = "admin" +store_secret_access_key = "password" +store_region = "us-east-1" +store_class = "s3" + + +[sinks.iceberg.transforms.add_fields] +enabled = true + +[[sinks.iceberg.transforms.add_fields.fields]] +key = "db_table" +value.static = "nyc.users" + +# +# [sinks.stdout] +# enabled = true +# name = "Stdout sink" +# path = "target/release/libiggy_connector_stdout_sink" +# +# [[sinks.stdout.streams]] +# stream = "example_stream" +# topics = ["example_topic"] +# schema = "json" +# batch_length = 100 +# poll_interval = "5ms" +# consumer_group = "stdout_sink_connector" +# +# [sinks.stdout.config] +# print_payload = false +# +# [sinks.stdout.transforms.add_fields] +# enabled = true +# +# [[sinks.stdout.transforms.add_fields.fields]] +# key = "message" +# value.static = "hello" + +[sources.random] Review Comment: Maybe remove the whole sources section for example config? ########## core/connectors/sinks/iceberg_sink/iggy_iceberg_sink_config.toml: ########## @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +[http_api] # Optional HTTP API configuration +enabled = true +address = "127.0.0.1:8081" +# api_key = "secret" # Optional API key for authentication to be passed as `api-key` header Review Comment: Keep this as `api_key = ""` (not commented out) ########## core/connectors/sinks/iceberg_sink/iggy_iceberg_sink_config.toml: ########## @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +[http_api] # Optional HTTP API configuration +enabled = true +address = "127.0.0.1:8081" +# api_key = "secret" # Optional API key for authentication to be passed as `api-key` header + +[http_api.cors] # Optional CORS configuration for HTTP API +enabled = false +allowed_methods = ["GET", "POST", "PUT", "DELETE"] +allowed_origins = ["*"] +allowed_headers = ["content-type"] +exposed_headers = [""] +allow_credentials = false +allow_private_network = false + +[http_api.tls] # Optional TLS configuration for HTTP API +enabled = false +cert_file = "core/certs/iggy_cert.pem" +key_file = "core/certs/iggy_key.pem" + +[iggy] +address = "localhost:8090" +username = "iggy" +password = "iggy" +# token = "secret" # Personal Access Token (PAT) can be used instead of username and password + +[state] +path = "local_state" + +[sinks.iceberg] +enabled = true +name = "Iceberg sink" +path = "target/release/libiggy_connector_iceberg_sink" + +[[sinks.iceberg.streams]] +stream = "example_stream" +topics = ["example_topic"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "iceberg_sink_connector" + +# Local S3 example +[sinks.iceberg.config] +tables = ["nyc.users"] +catalog_type = "rest" +warehouse = "warehouse" +uri = "http://localhost:8181" +dynamic_routing = true +dynamic_route_field = "db_table" +store_url = "http://localhost:9000" +store_access_key_id = "admin" +store_secret_access_key = "password" +store_region = "us-east-1" +store_class = "s3" + + +[sinks.iceberg.transforms.add_fields] +enabled = true + +[[sinks.iceberg.transforms.add_fields.fields]] +key = "db_table" +value.static = "nyc.users" + +# +# [sinks.stdout] Review Comment: Let's remove commented out stdout sink. ########## core/connectors/sinks/iceberg_sink/src/sink.rs: ########## @@ -0,0 +1,91 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{ + IcebergSink, + catalog::init_catalog, + router::{dynamic_router::DynamicRouter, static_router::StaticRouter}, +}; +use async_trait::async_trait; +use iceberg::Catalog; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata}; +use tracing::{debug, error, info}; + +#[async_trait] +impl Sink for IcebergSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opened Iceberg sink connector with ID: {} for URL: {}", Review Comment: We could also log here the redacted values for access key ID and secret, e.g. just take the first few chars and add them to the log, it's especially helpful when you want to ensure, that the provided credentials are valid. ########## core/connectors/sinks/iceberg_sink/src/router/mod.rs: ########## @@ -0,0 +1,221 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::router::arrow_streamer::JsonArrowReader; +use crate::slice_user_table; +use arrow_json::ReaderBuilder; +use async_trait::async_trait; +use iceberg::TableIdent; +use iceberg::arrow::schema_to_arrow_schema; +use iceberg::spec::{Literal, PrimitiveLiteral, PrimitiveType, Struct, StructType}; +use iceberg::table::Table; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{ + Catalog, + writer::file_writer::location_generator::{DefaultFileNameGenerator, DefaultLocationGenerator}, +}; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload, Schema}; +use parquet::file::properties::WriterProperties; +use std::sync::Arc; +use tracing::{error, warn}; +use uuid::Uuid; + +mod arrow_streamer; +pub mod dynamic_router; +pub mod static_router; + +pub fn is_valid_namespaced_table(input: &str) -> bool { + let parts: Vec<&str> = input.split('.').collect(); + parts.len() >= 2 && parts.iter().all(|part| !part.is_empty()) +} + +async fn table_exists(route_field_val: &str, catalog: &dyn Catalog) -> Option<Table> { + let sliced_table = slice_user_table(route_field_val); + let table_ident = TableIdent::from_strs(&sliced_table).ok()?; + + catalog.load_table(&table_ident).await.ok() +} + +pub fn primitive_type_to_literal(pt: &PrimitiveType) -> Result<PrimitiveLiteral, Error> { + match pt { + PrimitiveType::Boolean => Ok(PrimitiveLiteral::Boolean(false)), + PrimitiveType::Int => Ok(PrimitiveLiteral::Int(0)), + PrimitiveType::Long => Ok(PrimitiveLiteral::Long(0)), + PrimitiveType::Decimal { .. } => Ok(PrimitiveLiteral::Int128(0)), + PrimitiveType::Date => Ok(PrimitiveLiteral::Int(0)), // e.g. days since epoch + PrimitiveType::Time => Ok(PrimitiveLiteral::Long(0)), // microseconds since midnight + PrimitiveType::Timestamp => Ok(PrimitiveLiteral::Long(0)), // microseconds since epoch + PrimitiveType::Timestamptz => Ok(PrimitiveLiteral::Long(0)), + PrimitiveType::TimestampNs => Ok(PrimitiveLiteral::Long(0)), + PrimitiveType::TimestamptzNs => Ok(PrimitiveLiteral::Long(0)), + PrimitiveType::String => Ok(PrimitiveLiteral::String(String::new())), + PrimitiveType::Uuid => Ok(PrimitiveLiteral::Binary(vec![0; 16])), + PrimitiveType::Fixed(len) => Ok(PrimitiveLiteral::Binary(vec![0; *len as usize])), + PrimitiveType::Binary => Ok(PrimitiveLiteral::Binary(Vec::new())), + _ => { + error!("Partition type not supported"); + Err(Error::InvalidConfig) + } + } +} + +fn get_partition_type_value(default_partition_type: &StructType) -> Result<Option<Struct>, Error> { + let mut fields: Vec<Option<Literal>> = Vec::new(); + + if default_partition_type.fields().is_empty() { + return Ok(None); + }; + + for field in default_partition_type.fields() { + let t = field.field_type.as_primitive_type().ok_or_else(|| { Review Comment: Maybe let's call this variable like `field_type` (for example), something a bit more meaningful than plain `t`. ########## core/connectors/sinks/iceberg_sink/src/router/dynamic_router.rs: ########## @@ -0,0 +1,169 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::router::{Router, is_valid_namespaced_table, table_exists, write_data}; +use async_trait::async_trait; +use iceberg::Catalog; +use iceberg::table::Table; +use iggy_connector_sdk::{ConsumedMessage, Error, MessagesMetadata, Payload}; +use simd_json::base::ValueAsObject; +use std::collections::HashMap; +use tracing::{info, warn}; + +#[derive(Debug)] +pub struct DynamicRouter { + catalog: Box<dyn Catalog>, + route_field: String, +} + +pub struct DynamicWriter { + pub tables_to_write: HashMap<String, Table>, + pub table_to_message: HashMap<String, Vec<ConsumedMessage>>, +} + +impl DynamicWriter { + pub fn new() -> Self { + let tables_to_write = HashMap::new(); + let table_to_message = HashMap::new(); + Self { + tables_to_write, + table_to_message, + } + } + + fn push_to_existing( + &mut self, + route_field_val: &str, + message: ConsumedMessage, + ) -> Option<ConsumedMessage> { + if let Some(message_vec) = self.table_to_message.get_mut(route_field_val) { + message_vec.push(message); + None + } else { + Some(message) + } + } + + async fn load_table_if_exists( + &mut self, + route_field_val: &str, + catalog: &dyn Catalog, + ) -> Result<(), Error> { + let table = table_exists(route_field_val, catalog) + .await + .ok_or(Error::InvalidState)?; + + self.tables_to_write + .insert(route_field_val.to_string(), table); + + Ok(()) + } +} + +impl DynamicRouter { + pub fn new(catalog: Box<dyn Catalog>, route_field: String) -> Self { + Self { + catalog, + route_field, + } + } + + fn extract_route_field(&self, message: &ConsumedMessage) -> Option<String> { + match &message.payload { + Payload::Json(payload) => payload + .as_object() + .and_then(|obj| obj.get(&self.route_field)) + .map(|val| val.to_string()), + _ => { + warn!("Unsupported format for iceberg connector"); + None + } + } + } +} + +#[async_trait] +impl Router for DynamicRouter { + async fn route_data( + &self, + messages_metadata: MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), crate::Error> { + let mut writer = DynamicWriter::new(); + for message in messages { + let route_field_val = match self.extract_route_field(&message) { + Some(val) => val, + None => continue, + }; + + let message = match writer.push_to_existing(&route_field_val, message) { + Some(msg) => msg, + None => continue, + }; + + if !is_valid_namespaced_table(&route_field_val) { + warn!( + "Found invalid route field name on message: {}. Route fields should have at least 1 namespace separated by '.' character before the table", + route_field_val Review Comment: Let's use the string interpolation. ########## core/connectors/sinks/iceberg_sink/src/router/arrow_streamer.rs: ########## @@ -0,0 +1,79 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use simd_json::OwnedValue; +use std::io::{self, BufRead, Cursor, Read}; +use std::slice::Iter; + +pub struct JsonArrowReader<'a> { + values: Iter<'a, &'a OwnedValue>, + cursor: Cursor<Vec<u8>>, +} + +impl<'a> JsonArrowReader<'a> { + pub fn new(values: &'a [&OwnedValue]) -> Self { + Self { + values: values.iter(), + cursor: Cursor::new(Vec::new()), + } + } + + fn load_next(&mut self) -> io::Result<bool> { + if let Some(val) = self.values.next() { Review Comment: You could also use `let Some(val) = .self.values.next() else { }` block, which might be a a bit more readable for extended logic :) ########## core/connectors/sinks/iceberg_sink/iggy_iceberg_sink_config.toml: ########## @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + Review Comment: Please adjust this file for the new config format, where each sink/source has it's own dedicated configuration file with specified `id`, `type` and `version`. New, example configs are kept under `runtime/example_config/connectors`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
