hubcio commented on code in PR #2685:
URL: https://github.com/apache/iggy/pull/2685#discussion_r2786304116


##########
core/common/src/lib.rs:
##########
@@ -108,5 +108,8 @@ pub use 
utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
 pub use utils::random_id;
 pub use utils::text;
 pub use utils::timestamp::*;
+
+// Re-export chrono types for connectors and other crates that need DateTime

Review Comment:
   remove this comment



##########
core/integration/tests/connectors/fixtures/elasticsearch/sink.rs:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::container::{
+    DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SINK_INDEX, ENV_SINK_PATH,
+    ENV_SINK_STREAMS_0_CONSUMER_GROUP, ENV_SINK_STREAMS_0_SCHEMA, 
ENV_SINK_STREAMS_0_STREAM,
+    ENV_SINK_STREAMS_0_TOPICS, ENV_SINK_URL, ElasticsearchContainer, 
ElasticsearchOps,
+    ElasticsearchSearchResponse, HEALTH_CHECK_ATTEMPTS, 
HEALTH_CHECK_INTERVAL_MS,
+    create_http_client,
+};
+use async_trait::async_trait;
+use integration::harness::{TestBinaryError, TestFixture};
+use reqwest_middleware::ClientWithMiddleware as HttpClient;
+use std::collections::HashMap;
+use std::time::Duration;
+use tokio::time::sleep;
+use tracing::info;
+
+const SINK_INDEX: &str = "iggy_messages";
+const POLL_ATTEMPTS: usize = 100;
+const POLL_INTERVAL_MS: u64 = 50;
+
+pub struct ElasticsearchSinkFixture {
+    container: ElasticsearchContainer,
+    http_client: HttpClient,
+}
+
+impl ElasticsearchOps for ElasticsearchSinkFixture {
+    fn container(&self) -> &ElasticsearchContainer {
+        &self.container
+    }
+
+    fn http_client(&self) -> &HttpClient {
+        &self.http_client
+    }
+}
+
+impl ElasticsearchSinkFixture {
+    pub async fn get_document_count(&self) -> Result<usize, TestBinaryError> {
+        self.count_documents(SINK_INDEX).await
+    }
+
+    pub async fn wait_for_documents(
+        &self,
+        expected_count: usize,
+    ) -> Result<usize, TestBinaryError> {
+        for _ in 0..POLL_ATTEMPTS {
+            match self.count_documents(SINK_INDEX).await {
+                Ok(count) if count >= expected_count => {
+                    info!("Found {count} documents in Elasticsearch (expected 
{expected_count})");
+                    return Ok(count);
+                }
+                Ok(_) => {}
+                Err(_) => {}
+            }
+            sleep(Duration::from_millis(POLL_INTERVAL_MS)).await;
+        }
+
+        let final_count = self.count_documents(SINK_INDEX).await.unwrap_or(0);
+        Err(TestBinaryError::InvalidState {
+            message: format!(
+                "Expected at least {expected_count} documents, found 
{final_count} after {POLL_ATTEMPTS} attempts"
+            ),
+        })
+    }
+
+    pub async fn search_documents(&self) -> 
Result<ElasticsearchSearchResponse, TestBinaryError> {
+        self.search_all(SINK_INDEX).await
+    }
+
+    pub async fn refresh_index(&self) -> Result<(), TestBinaryError> {
+        ElasticsearchOps::refresh_index(self, SINK_INDEX).await
+    }
+}
+
+#[async_trait]
+impl TestFixture for ElasticsearchSinkFixture {
+    async fn setup() -> Result<Self, TestBinaryError> {
+        let container = ElasticsearchContainer::start().await?;
+        let http_client = create_http_client();
+
+        let fixture = Self {
+            container,
+            http_client,
+        };
+
+        for _ in 0..HEALTH_CHECK_ATTEMPTS {
+            let url = format!("{}/_cluster/health", 
fixture.container.base_url);
+            if let Ok(response) = fixture.http_client.get(&url).send().await
+                && response.status().is_success()
+            {
+                info!("Elasticsearch cluster is healthy");
+                break;
+            }
+            sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
+        }

Review Comment:
   if this loop exhausts all attempts Ok(fixture) is still returned



##########
core/server/src/shard/system/consumer_offsets.rs:
##########
@@ -69,16 +69,28 @@ impl IggyShard {
     ) -> Result<Option<ConsumerOffsetInfo>, IggyError> {
         let (stream, topic) = self.resolve_topic_id(stream_id, topic_id)?;
 
-        let Some((polling_consumer, partition_id)) = 
self.resolve_consumer_with_partition_id(
-            stream_id,
-            topic_id,
-            &consumer,
-            client_id,
-            partition_id,
-            false,
-        )?
-        else {
-            return Err(IggyError::NotResolvedConsumer(consumer.id));
+        let (polling_consumer, partition_id) = match consumer.kind {
+            ConsumerKind::Consumer => {
+                let Some((polling_consumer, partition_id)) = self
+                    .resolve_consumer_with_partition_id(
+                        stream_id,
+                        topic_id,
+                        &consumer,
+                        client_id,
+                        partition_id,
+                        false,
+                    )?
+                else {
+                    return 
Err(IggyError::NotResolvedConsumer(consumer.id.clone()));
+                };
+                (polling_consumer, partition_id)
+            }
+            ConsumerKind::ConsumerGroup => {
+                let (_, _, cg_id) =
+                    self.resolve_consumer_group_id(stream_id, topic_id, 
&consumer.id)?;
+                let partition_id = partition_id.unwrap_or(0) as usize;
+                (PollingConsumer::consumer_group(cg_id, 0), partition_id)
+            }

Review Comment:
   this is critical problem I think: old code enforced membership via 
resolve_consumer_with_partition_id → get_consumer_group_member_id, returning 
`ConsumerGroupMemberNotFound` for non-members. The new code bypasses this 
entirely for ConsumerKind::ConsumerGroup, only checking that the group exists 
via resolve_consumer_group_id
   
   or am I wrong?



##########
core/integration/tests/connectors/fixtures/iceberg/container.rs:
##########
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use integration::harness::{TestBinaryError, TestFixture};
+use reqwest_middleware::ClientWithMiddleware as HttpClient;
+use reqwest_retry::RetryTransientMiddleware;
+use reqwest_retry::policies::ExponentialBackoff;
+use std::collections::HashMap;
+use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
+use testcontainers_modules::testcontainers::runners::AsyncRunner;
+use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, 
ImageExt};
+use tracing::info;
+use uuid::Uuid;
+
+const MINIO_IMAGE: &str = "minio/minio";
+const MINIO_TAG: &str = "latest";

Review Comment:
   don't use latest, pin some version - we dont want to break our CI when they 
release new version



##########
core/connectors/sources/postgres_source/src/lib.rs:
##########
@@ -1293,4 +1329,79 @@ mod tests {
         let redacted = redact_connection_string(conn);
         assert_eq!(redacted, "postgresql://adm***");
     }
+
+    #[test]
+    fn given_persisted_state_should_restore_tracking_offsets() {
+        let state = State {
+            last_poll_time: Utc::now(),
+            tracking_offsets: HashMap::from([
+                ("users".to_string(), "100".to_string()),
+                ("orders".to_string(), "2024-01-15T10:30:00Z".to_string()),
+            ]),
+            processed_rows: 500,
+        };
+
+        let connector_state =
+            ConnectorState::serialize(&state, "test", 1).expect("Failed to 
serialize state");
+
+        let src = PostgresSource::new(1, test_config(), Some(connector_state));
+
+        let runtime = tokio::runtime::Runtime::new().unwrap();
+        runtime.block_on(async {
+            let restored = src.state.lock().await;
+            assert_eq!(
+                restored.tracking_offsets.get("users"),
+                Some(&"100".to_string())
+            );
+            assert_eq!(
+                restored.tracking_offsets.get("orders"),
+                Some(&"2024-01-15T10:30:00Z".to_string())
+            );
+            assert_eq!(restored.processed_rows, 500);
+        });
+    }
+
+    #[test]
+    fn given_no_state_should_start_fresh() {
+        let src = PostgresSource::new(1, test_config(), None);
+
+        let runtime = tokio::runtime::Runtime::new().unwrap();
+        runtime.block_on(async {
+            let state = src.state.lock().await;
+            assert!(state.tracking_offsets.is_empty());
+            assert_eq!(state.processed_rows, 0);
+        });
+    }
+
+    #[test]
+    fn given_invalid_state_should_start_fresh() {
+        let invalid_state = ConnectorState(b"not valid json".to_vec());
+        let src = PostgresSource::new(1, test_config(), Some(invalid_state));
+
+        let runtime = tokio::runtime::Runtime::new().unwrap();
+        runtime.block_on(async {
+            let state = src.state.lock().await;
+            assert!(state.tracking_offsets.is_empty());
+            assert_eq!(state.processed_rows, 0);
+        });
+    }
+
+    #[test]
+    fn state_should_be_serializable_and_deserializable() {
+        let original = State {
+            last_poll_time: 
DateTime::parse_from_rfc3339("2024-01-15T10:30:00Z")
+                .unwrap()
+                .with_timezone(&Utc),
+            tracking_offsets: HashMap::from([("table1".to_string(), 
"42".to_string())]),
+            processed_rows: 1000,
+        };
+
+        let serialized = serde_json::to_vec(&original).expect("Failed to 
serialize");
+        let deserialized: State =
+            serde_json::from_slice(&serialized).expect("Failed to 
deserialize");
+
+        assert_eq!(original.last_poll_time, deserialized.last_poll_time);
+        assert_eq!(original.tracking_offsets, deserialized.tracking_offsets);
+        assert_eq!(original.processed_rows, deserialized.processed_rows);
+    }

Review Comment:
   i'm not fan of this test. in production we use msgpack but here we use 
serde_json. please validate actual state, not some shenanigans like here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to