chiradip commented on code in PR #2175:
URL: https://github.com/apache/iggy/pull/2175#discussion_r2361312529


##########
core/connectors/sources/flink_source/src/flink_reader.rs:
##########
@@ -0,0 +1,384 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config::{FlinkSourceConfig, SourceType};
+use reqwest::Client;
+use serde::{Deserialize, Serialize};
+use serde_json::Value as JsonValue;
+use std::collections::HashMap;
+use std::time::Duration;
+use thiserror::Error;
+use tracing::{debug, error, info};
+use uuid::Uuid;
+
+#[derive(Debug, Error)]
+pub enum FlinkReaderError {
+    #[error("Connection failed: {0}")]
+    ConnectionError(String),
+
+    #[error("Subscription failed: {0}")]
+    SubscriptionError(String),
+
+    #[error("Fetch failed: {0}")]
+    FetchError(String),
+
+    #[error("Deserialization failed: {0}")]
+    DeserializationError(String),
+
+    #[error("HTTP error: {0}")]
+    HttpError(#[from] reqwest::Error),
+
+    #[error("JSON error: {0}")]
+    JsonError(#[from] serde_json::Error),
+
+    #[error("Job not found: {0}")]
+    JobNotFound(String),
+
+    #[error("Invalid job state: {0}")]
+    InvalidJobState(String),
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct FlinkMessage {
+    pub id: Uuid,
+    pub timestamp: u64,
+    pub origin_timestamp: Option<u64>,
+    pub headers: HashMap<String, String>,
+    pub data: JsonValue,
+    pub checksum: u64,
+    pub source_partition: Option<i32>,
+    pub source_offset: Option<u64>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ClusterOverview {
+    #[serde(rename = "flink-version")]
+    pub flink_version: String,
+    #[serde(rename = "flink-commit")]
+    pub flink_commit: String,
+    #[serde(rename = "jobs-running")]
+    pub jobs_running: u32,
+    #[serde(rename = "taskmanagers")]
+    pub taskmanagers: u32,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct JobVertex {
+    pub id: String,
+    pub name: String,
+    pub parallelism: u32,
+    pub status: String,
+    pub metrics: Option<VertexMetrics>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct VertexMetrics {
+    #[serde(rename = "read-records")]
+    pub read_records: u64,
+    #[serde(rename = "write-records")]
+    pub write_records: u64,
+    #[serde(rename = "read-bytes")]
+    pub read_bytes: u64,
+    #[serde(rename = "write-bytes")]
+    pub write_bytes: u64,
+}
+
+#[derive(Debug)]
+pub struct FlinkReader {
+    client: Client,
+    config: FlinkSourceConfig,
+    base_url: String,
+    subscription_id: Option<String>,
+    active_job_id: Option<String>,
+    source_vertex_id: Option<String>,
+}
+
+impl FlinkReader {
+    pub fn new(config: FlinkSourceConfig) -> Self {
+        let client = Client::builder()
+            .timeout(Duration::from_secs(config.connection_timeout_secs))
+            .build()
+            .expect("Failed to build HTTP client");
+
+        let base_url = 
config.flink_cluster_url.trim_end_matches('/').to_string();
+
+        FlinkReader {
+            client,
+            config,
+            base_url,
+            subscription_id: None,
+            active_job_id: None,
+            source_vertex_id: None,
+        }
+    }
+
+    pub async fn check_connection(&self) -> Result<ClusterOverview, 
FlinkReaderError> {
+        let url = format!("{}/v1/overview", self.base_url);
+        let response = self.client.get(&url).send().await?;
+
+        if !response.status().is_success() {
+            return Err(FlinkReaderError::ConnectionError(format!(
+                "Cluster returned status: {}",
+                response.status()
+            )));
+        }
+
+        let overview: ClusterOverview = response.json().await?;
+        info!(
+            "Connected to Flink cluster v{} with {} taskmanagers and {} 
running jobs",
+            overview.flink_version, overview.taskmanagers, 
overview.jobs_running
+        );
+        Ok(overview)
+    }
+
+    pub async fn subscribe_to_source(&mut self) -> Result<(), 
FlinkReaderError> {
+        // Find a suitable job that has the source we're looking for
+        let jobs = self.list_running_jobs().await?;
+
+        for job in jobs {
+            if let Ok(vertices) = self.get_job_vertices(&job).await {
+                // Look for a source vertex that matches our configuration
+                for vertex in vertices {
+                    if self.is_matching_source(&vertex) {
+                        self.active_job_id = Some(job.clone());
+                        self.source_vertex_id = Some(vertex.id.clone());
+                        info!("Subscribed to source vertex {} in job {}", 
vertex.name, job);
+                        return Ok(());
+                    }
+                }
+            }
+        }
+
+        // If no matching job found, return a subscription error
+        self.subscription_id = Some(Uuid::new_v4().to_string());
+        Err(FlinkReaderError::SubscriptionError(format!(
+            "No matching Flink job found for source: {}",
+            self.config.source_identifier
+        )))
+    }
+
+    async fn list_running_jobs(&self) -> Result<Vec<String>, FlinkReaderError> 
{
+        let url = format!("{}/v1/jobs", self.base_url);
+        let response = self.client.get(&url).send().await?;
+
+        if !response.status().is_success() {
+            return Err(FlinkReaderError::FetchError(format!(
+                "Failed to list jobs: {}",
+                response.status()
+            )));
+        }
+
+        let body: JsonValue = response.json().await?;
+        let jobs = body["jobs"].as_array().ok_or_else(|| {
+            FlinkReaderError::DeserializationError("Invalid jobs 
response".to_string())
+        })?;
+
+        let running_jobs: Vec<String> = jobs
+            .iter()
+            .filter_map(|j| {
+                if j["state"].as_str() == Some("RUNNING") {
+                    j["id"].as_str().map(|s| s.to_string())
+                } else {
+                    None
+                }
+            })
+            .collect();
+
+        Ok(running_jobs)
+    }
+
+    async fn get_job_vertices(&self, job_id: &str) -> Result<Vec<JobVertex>, 
FlinkReaderError> {
+        let url = format!("{}/v1/jobs/{}", self.base_url, job_id);
+        let response = self.client.get(&url).send().await?;
+
+        if !response.status().is_success() {
+            return Err(FlinkReaderError::JobNotFound(job_id.to_string()));
+        }
+
+        let body: JsonValue = response.json().await?;
+        let vertices = body["vertices"].as_array().ok_or_else(|| {
+            FlinkReaderError::DeserializationError("Invalid job 
vertices".to_string())
+        })?;
+
+        let vertex_list: Result<Vec<JobVertex>, _> = vertices
+            .iter()
+            .map(|v| serde_json::from_value(v.clone()))
+            .collect();
+
+        vertex_list.map_err(FlinkReaderError::JsonError)
+    }
+
+    fn is_matching_source(&self, vertex: &JobVertex) -> bool {
+        // Check if this vertex is a source operator matching our configuration
+        let vertex_name = vertex.name.to_lowercase();
+        let source_id = self.config.source_identifier.to_lowercase();
+
+        match &self.config.source_type {
+            SourceType::Kafka => {
+                vertex_name.contains("kafka")
+                    && (vertex_name.contains(&source_id) || 
source_id.is_empty())
+            }
+            SourceType::Kinesis => {
+                vertex_name.contains("kinesis")
+                    && (vertex_name.contains(&source_id) || 
source_id.is_empty())
+            }
+            SourceType::FileSystem => {
+                vertex_name.contains("file")
+                    && (vertex_name.contains(&source_id) || 
source_id.is_empty())
+            }
+            SourceType::Custom(custom_type) => {
+                vertex_name.contains(custom_type)
+                    && (vertex_name.contains(&source_id) || 
source_id.is_empty())
+            }
+            _ => false,
+        }
+    }
+
+    pub async fn fetch_batch(
+        &self,
+        offset: u64,
+        batch_size: usize,
+    ) -> Result<Vec<FlinkMessage>, FlinkReaderError> {
+        // In a real implementation, this would:

Review Comment:
   {}/v1/jobs/{}/vertices/{}/metrics -- this is real albeit limited 
information. I will do more drilling into it once the structure is in place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to