mmodzelewski commented on code in PR #2499:
URL: https://github.com/apache/iggy/pull/2499#discussion_r2639284988


##########
foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.connector.pinot.metadata.IggyStreamMetadataProvider;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+
+/**
+ * Factory for creating Iggy stream consumers and metadata providers.
+ * This is the main entry point for Pinot's stream ingestion framework to 
interact with Iggy.
+ *
+ * <p>Configuration in Pinot table config:
+ * <pre>{@code
+ * "streamConfigs": {
+ *   "streamType": "iggy",
+ *   "stream.iggy.consumer.factory.class.name": 
"org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory",
+ *   "stream.iggy.host": "localhost",
+ *   "stream.iggy.port": "8090",
+ *   "stream.iggy.username": "iggy",
+ *   "stream.iggy.password": "iggy",
+ *   "stream.iggy.stream.id": "my-stream",
+ *   "stream.iggy.topic.id": "my-topic",
+ *   "stream.iggy.consumer.group": "pinot-consumer-group",
+ *   "stream.iggy.poll.batch.size": "100"
+ * }
+ * }</pre>
+ */
+public class IggyConsumerFactory extends StreamConsumerFactory {
+
+    private StreamConfig streamConfig;
+
+    @Override
+    public void init(StreamConfig streamConfig) {
+        this.streamConfig = streamConfig;
+    }
+
+    /**
+     * Creates a partition-level consumer for reading from a specific Iggy 
partition.
+     * Pinot calls this method for each partition that needs to be consumed.
+     *
+     * @param clientId unique identifier for this consumer instance
+     * @param groupId partition group identifier (partition ID in Iggy)
+     * @return a new partition consumer instance
+     */
+    public PartitionGroupConsumer createPartitionGroupConsumer(String 
clientId, int groupId) {

Review Comment:
   This method does not match the interface declaration. The interface has this 
one, though:  
   ```java
    public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
         PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
   ```



##########
foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.connector.pinot.metadata.IggyStreamMetadataProvider;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+
+/**
+ * Factory for creating Iggy stream consumers and metadata providers.
+ * This is the main entry point for Pinot's stream ingestion framework to 
interact with Iggy.
+ *
+ * <p>Configuration in Pinot table config:
+ * <pre>{@code
+ * "streamConfigs": {
+ *   "streamType": "iggy",
+ *   "stream.iggy.consumer.factory.class.name": 
"org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory",
+ *   "stream.iggy.host": "localhost",
+ *   "stream.iggy.port": "8090",
+ *   "stream.iggy.username": "iggy",
+ *   "stream.iggy.password": "iggy",
+ *   "stream.iggy.stream.id": "my-stream",
+ *   "stream.iggy.topic.id": "my-topic",
+ *   "stream.iggy.consumer.group": "pinot-consumer-group",
+ *   "stream.iggy.poll.batch.size": "100"
+ * }
+ * }</pre>
+ */
+public class IggyConsumerFactory extends StreamConsumerFactory {
+
+    private StreamConfig streamConfig;
+
+    @Override
+    public void init(StreamConfig streamConfig) {
+        this.streamConfig = streamConfig;
+    }
+
+    /**
+     * Creates a partition-level consumer for reading from a specific Iggy 
partition.
+     * Pinot calls this method for each partition that needs to be consumed.
+     *
+     * @param clientId unique identifier for this consumer instance
+     * @param groupId partition group identifier (partition ID in Iggy)
+     * @return a new partition consumer instance
+     */
+    public PartitionGroupConsumer createPartitionGroupConsumer(String 
clientId, int groupId) {
+        IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig);
+        return new IggyPartitionGroupConsumer(clientId, iggyConfig, groupId);
+    }
+
+    /**
+     * Creates a partition-level consumer (newer Pinot API).
+     * Wraps the partition group consumer for compatibility.
+     *
+     * @param clientId unique identifier for this consumer instance
+     * @param partition partition identifier
+     * @return a new partition consumer instance
+     */
+    @Override
+    public PartitionLevelConsumer createPartitionLevelConsumer(String 
clientId, int partition) {

Review Comment:
   This method is marked as deprecated in the interface, so I'm not sure if we 
need this part of the implementation. 



##########
foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/performance/PerformanceBenchmarkTest.java:
##########


Review Comment:
   The `PerformanceBenchmarkTest` tests ArrayList/Long operations rather than 
actual connector behaviour (no network I/O, no Iggy server). The metrics in 
TEST_REPORT.md (e.g., "1.43M msg/sec") are based on these and could be 
misleading. I'd suggest removing both the test file and TEST_REPORT.md.



##########
foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.metadata;
+
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.connector.pinot.consumer.IggyStreamPartitionMsgOffset;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.partition.Partition;
+import org.apache.iggy.topic.TopicDetails;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Metadata provider for Iggy streams.
+ * Provides information about partitions, offsets, and message counts.
+ *
+ * <p>This provider connects to Iggy via TCP to query:
+ * <ul>
+ *   <li>Number of partitions in a topic</li>
+ *   <li>Oldest available offset per partition</li>
+ *   <li>Latest offset per partition</li>
+ *   <li>Message counts</li>
+ * </ul>
+ */
+public class IggyStreamMetadataProvider implements StreamMetadataProvider {
+
+    private static final Logger log = 
LoggerFactory.getLogger(IggyStreamMetadataProvider.class);
+
+    private static final long DETAILS_CACHE_MS = 5000; // 5 seconds cache
+
+    private final String clientId;
+    private final IggyStreamConfig config;
+    private final Integer partitionId; // null for stream-level, non-null for 
partition-level
+
+    private AsyncIggyTcpClient asyncClient;
+    private StreamId streamId;
+    private TopicId topicId;
+    private TopicDetails cachedTopicDetails;
+    private long lastDetailsRefresh;
+
+    /**
+     * Creates a stream-level metadata provider (all partitions).
+     *
+     * @param clientId unique identifier
+     * @param config Iggy stream configuration
+     */
+    public IggyStreamMetadataProvider(String clientId, IggyStreamConfig 
config) {
+        this(clientId, config, null);
+    }
+
+    /**
+     * Creates a partition-level metadata provider.
+     *
+     * @param clientId unique identifier
+     * @param config Iggy stream configuration
+     * @param partitionId specific partition ID
+     */
+    public IggyStreamMetadataProvider(String clientId, IggyStreamConfig 
config, Integer partitionId) {
+        this.clientId = clientId;
+        this.config = config;
+        this.partitionId = partitionId;
+
+        log.info(
+                "Created IggyStreamMetadataProvider: clientId={}, 
partitionId={}, config={}",
+                clientId,
+                partitionId,
+                config);
+    }
+
+    /**
+     * Retrieves the number of partitions and their metadata.
+     * Called by Pinot to discover available partitions in the stream.
+     *
+     * @param timeoutMillis timeout for the operation
+     * @return number of partitions in the topic
+     */
+    @Override
+    public int fetchPartitionCount(long timeoutMillis) {
+        try {
+            ensureConnected();
+            TopicDetails topicDetails = fetchTopicDetails();
+            int partitionCount = topicDetails.partitionsCount().intValue();
+            log.info("Found {} partitions for topic {}", partitionCount, 
config.getTopicId());
+            return partitionCount;
+        } catch (RuntimeException e) {
+            log.error("Error fetching partition count: {}", e.getMessage(), e);
+            throw new RuntimeException("Failed to fetch partition count", e);
+        }
+    }
+
+    /**
+     * Fetches the current offset for consumption.
+     * For Iggy, we rely on consumer group state, so this returns the earliest 
offset.
+     *
+     * @param offsetCriteria offset criteria (earliest, latest, etc.)
+     * @param timeoutMillis timeout for the operation
+     * @return current offset for the partition
+     */
+    @Override
+    public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
+        try {
+            ensureConnected();
+
+            if (partitionId == null) {
+                throw new IllegalStateException("Partition ID must be set for 
offset queries");
+            }
+
+            Partition partition = getPartitionInfo(partitionId);
+
+            // Handle offset criteria
+            if (offsetCriteria != null && offsetCriteria.isSmallest()) {
+                // Return earliest available offset (0 for Iggy)
+                return new IggyStreamPartitionMsgOffset(0);
+            } else if (offsetCriteria != null && offsetCriteria.isLargest()) {
+                // Return latest offset based on messages count
+                long latestOffset = partition.messagesCount().longValue();
+                return new IggyStreamPartitionMsgOffset(latestOffset);
+            } else {
+                // Default to consumer group managed offset (start from 0)
+                return new IggyStreamPartitionMsgOffset(0);
+            }
+
+        } catch (RuntimeException e) {
+            log.error("Error fetching partition offset: {}", e.getMessage(), 
e);
+            throw new RuntimeException("Failed to fetch partition offset", e);
+        }
+    }
+
+    /**
+     * Ensures TCP connection to Iggy server is established.
+     */
+    private void ensureConnected() {
+        if (asyncClient == null) {
+            log.info("Connecting to Iggy server: {}", 
config.getServerAddress());
+
+            asyncClient = AsyncIggyTcpClient.builder()
+                    .host(config.getHost())
+                    .port(config.getPort())
+                    .credentials(config.getUsername(), config.getPassword())
+                    .connectionPoolSize(config.getConnectionPoolSize())
+                    .build();
+
+            // Connect and authenticate
+            asyncClient.connect().join();
+
+            // Parse stream and topic IDs
+            streamId = parseStreamId(config.getStreamId());
+            topicId = parseTopicId(config.getTopicId());
+
+            log.info("Connected to Iggy server successfully");
+        }
+    }
+
+    /**
+     * Fetches topic details with caching.
+     */
+    private TopicDetails fetchTopicDetails() {
+        long now = System.currentTimeMillis();
+        if (cachedTopicDetails == null || (now - lastDetailsRefresh) > 
DETAILS_CACHE_MS) {
+            try {
+                Optional<TopicDetails> details =
+                        asyncClient.topics().getTopicAsync(streamId, 
topicId).join();
+                cachedTopicDetails =
+                        details.orElseThrow(() -> new RuntimeException("Topic 
not found: " + config.getTopicId()));
+                lastDetailsRefresh = now;
+            } catch (RuntimeException e) {
+                log.error("Error fetching topic details: {}", e.getMessage(), 
e);
+                throw new RuntimeException("Failed to fetch topic details", e);
+            }
+        }
+        return cachedTopicDetails;
+    }
+
+    /**
+     * Gets information for a specific partition.
+     */
+    private Partition getPartitionInfo(int partitionId) {
+        TopicDetails details = fetchTopicDetails();
+        return details.partitions().stream()
+                .filter(p -> p.id().intValue() == partitionId)
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException("Partition " + 
partitionId + " not found"));
+    }
+
+    /**
+     * Parses stream ID from string (supports both numeric and named streams).
+     */
+    private StreamId parseStreamId(String streamIdStr) {
+        try {
+            return StreamId.of(Long.parseLong(streamIdStr));
+        } catch (NumberFormatException e) {
+            return StreamId.of(streamIdStr);
+        }
+    }
+
+    /**
+     * Parses topic ID from string (supports both numeric and named topics).
+     */
+    private TopicId parseTopicId(String topicIdStr) {
+        try {
+            return TopicId.of(Long.parseLong(topicIdStr));
+        } catch (NumberFormatException e) {
+            return TopicId.of(topicIdStr);
+        }
+    }
+
+    /**
+     * Fetches the latest offsets available for the specified partitions.
+     * Used by Pinot for ingestion delay tracking.
+     * Note: This method is required by Pinot runtime but may not be in 
compile-time interface.
+     *
+     * @param partitions set of partition IDs to fetch offsets for
+     * @param timeoutMillis timeout for the operation
+     * @return map of partition IDs to their latest offsets
+     */
+    public Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitions, long timeoutMillis) {
+        Map<Integer, StreamPartitionMsgOffset> offsets = new HashMap<>();
+
+        try {
+            ensureConnected();
+
+            for (Integer partition : partitions) {
+                Partition partitionInfo = getPartitionInfo(partition);
+                long latestOffset = partitionInfo.messagesCount().longValue();
+                log.debug("Latest offset for partition {}: {}", partition, 
latestOffset);
+                offsets.put(partition, new 
IggyStreamPartitionMsgOffset(latestOffset));
+            }
+
+            return offsets;
+        } catch (RuntimeException e) {
+            log.error("Error fetching latest offsets: {}", e.getMessage(), e);
+            throw new RuntimeException("Failed to fetch latest offsets", e);
+        }
+    }
+
+    /**
+     * Indicates whether this stream supports offset lag tracking.
+     * Iggy supports offset lag since we can track current vs latest offset.
+     * Note: This method is required by Pinot runtime but may not be in 
compile-time interface.
+     *
+     * @return true if offset lag is supported
+     */
+    public boolean supportsOffsetLag() {
+        return true;
+    }
+
+    @Override
+    public void close() throws IOException {

Review Comment:
   this method does not throw IOException, so it can be removed from signature



##########
foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatch.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.util.List;
+
+/**
+ * Implementation of Pinot's MessageBatch for Iggy messages.
+ * Wraps a list of messages with their offsets for consumption by Pinot.
+ */
+public class IggyMessageBatch implements MessageBatch<byte[]> {
+
+    private final List<IggyMessageAndOffset> messages;
+    private int currentIndex = 0;

Review Comment:
   `currentIndex` is not used anywhere. If that's correct, then please remove 
it.  



##########
foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.metadata;
+
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.connector.pinot.consumer.IggyStreamPartitionMsgOffset;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.partition.Partition;
+import org.apache.iggy.topic.TopicDetails;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Metadata provider for Iggy streams.
+ * Provides information about partitions, offsets, and message counts.
+ *
+ * <p>This provider connects to Iggy via TCP to query:
+ * <ul>
+ *   <li>Number of partitions in a topic</li>
+ *   <li>Oldest available offset per partition</li>
+ *   <li>Latest offset per partition</li>
+ *   <li>Message counts</li>
+ * </ul>
+ */
+public class IggyStreamMetadataProvider implements StreamMetadataProvider {
+
+    private static final Logger log = 
LoggerFactory.getLogger(IggyStreamMetadataProvider.class);
+
+    private static final long DETAILS_CACHE_MS = 5000; // 5 seconds cache
+
+    private final String clientId;
+    private final IggyStreamConfig config;
+    private final Integer partitionId; // null for stream-level, non-null for 
partition-level
+
+    private AsyncIggyTcpClient asyncClient;
+    private StreamId streamId;
+    private TopicId topicId;
+    private TopicDetails cachedTopicDetails;
+    private long lastDetailsRefresh;
+
+    /**
+     * Creates a stream-level metadata provider (all partitions).
+     *
+     * @param clientId unique identifier
+     * @param config Iggy stream configuration
+     */
+    public IggyStreamMetadataProvider(String clientId, IggyStreamConfig 
config) {
+        this(clientId, config, null);
+    }
+
+    /**
+     * Creates a partition-level metadata provider.
+     *
+     * @param clientId unique identifier
+     * @param config Iggy stream configuration
+     * @param partitionId specific partition ID
+     */
+    public IggyStreamMetadataProvider(String clientId, IggyStreamConfig 
config, Integer partitionId) {
+        this.clientId = clientId;
+        this.config = config;
+        this.partitionId = partitionId;
+
+        log.info(
+                "Created IggyStreamMetadataProvider: clientId={}, 
partitionId={}, config={}",
+                clientId,
+                partitionId,
+                config);
+    }
+
+    /**
+     * Retrieves the number of partitions and their metadata.
+     * Called by Pinot to discover available partitions in the stream.
+     *
+     * @param timeoutMillis timeout for the operation
+     * @return number of partitions in the topic
+     */
+    @Override
+    public int fetchPartitionCount(long timeoutMillis) {
+        try {
+            ensureConnected();
+            TopicDetails topicDetails = fetchTopicDetails();
+            int partitionCount = topicDetails.partitionsCount().intValue();
+            log.info("Found {} partitions for topic {}", partitionCount, 
config.getTopicId());
+            return partitionCount;
+        } catch (RuntimeException e) {
+            log.error("Error fetching partition count: {}", e.getMessage(), e);
+            throw new RuntimeException("Failed to fetch partition count", e);
+        }
+    }
+
+    /**
+     * Fetches the current offset for consumption.
+     * For Iggy, we rely on consumer group state, so this returns the earliest 
offset.
+     *
+     * @param offsetCriteria offset criteria (earliest, latest, etc.)
+     * @param timeoutMillis timeout for the operation
+     * @return current offset for the partition
+     */
+    @Override
+    public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
+        try {
+            ensureConnected();
+
+            if (partitionId == null) {
+                throw new IllegalStateException("Partition ID must be set for 
offset queries");
+            }
+
+            Partition partition = getPartitionInfo(partitionId);
+
+            // Handle offset criteria
+            if (offsetCriteria != null && offsetCriteria.isSmallest()) {
+                // Return earliest available offset (0 for Iggy)
+                return new IggyStreamPartitionMsgOffset(0);
+            } else if (offsetCriteria != null && offsetCriteria.isLargest()) {
+                // Return latest offset based on messages count
+                long latestOffset = partition.messagesCount().longValue();
+                return new IggyStreamPartitionMsgOffset(latestOffset);
+            } else {
+                // Default to consumer group managed offset (start from 0)
+                return new IggyStreamPartitionMsgOffset(0);
+            }
+
+        } catch (RuntimeException e) {
+            log.error("Error fetching partition offset: {}", e.getMessage(), 
e);
+            throw new RuntimeException("Failed to fetch partition offset", e);
+        }
+    }
+
+    /**
+     * Ensures TCP connection to Iggy server is established.
+     */
+    private void ensureConnected() {
+        if (asyncClient == null) {
+            log.info("Connecting to Iggy server: {}", 
config.getServerAddress());
+
+            asyncClient = AsyncIggyTcpClient.builder()
+                    .host(config.getHost())
+                    .port(config.getPort())
+                    .credentials(config.getUsername(), config.getPassword())
+                    .connectionPoolSize(config.getConnectionPoolSize())
+                    .build();
+
+            // Connect and authenticate
+            asyncClient.connect().join();
+
+            // Parse stream and topic IDs
+            streamId = parseStreamId(config.getStreamId());
+            topicId = parseTopicId(config.getTopicId());
+
+            log.info("Connected to Iggy server successfully");
+        }
+    }
+
+    /**
+     * Fetches topic details with caching.
+     */
+    private TopicDetails fetchTopicDetails() {
+        long now = System.currentTimeMillis();
+        if (cachedTopicDetails == null || (now - lastDetailsRefresh) > 
DETAILS_CACHE_MS) {
+            try {
+                Optional<TopicDetails> details =
+                        asyncClient.topics().getTopicAsync(streamId, 
topicId).join();
+                cachedTopicDetails =
+                        details.orElseThrow(() -> new RuntimeException("Topic 
not found: " + config.getTopicId()));
+                lastDetailsRefresh = now;
+            } catch (RuntimeException e) {
+                log.error("Error fetching topic details: {}", e.getMessage(), 
e);
+                throw new RuntimeException("Failed to fetch topic details", e);
+            }
+        }
+        return cachedTopicDetails;
+    }
+
+    /**
+     * Gets information for a specific partition.
+     */
+    private Partition getPartitionInfo(int partitionId) {
+        TopicDetails details = fetchTopicDetails();
+        return details.partitions().stream()
+                .filter(p -> p.id().intValue() == partitionId)
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException("Partition " + 
partitionId + " not found"));
+    }
+
+    /**
+     * Parses stream ID from string (supports both numeric and named streams).
+     */
+    private StreamId parseStreamId(String streamIdStr) {
+        try {
+            return StreamId.of(Long.parseLong(streamIdStr));
+        } catch (NumberFormatException e) {
+            return StreamId.of(streamIdStr);
+        }
+    }
+
+    /**
+     * Parses topic ID from string (supports both numeric and named topics).
+     */
+    private TopicId parseTopicId(String topicIdStr) {
+        try {
+            return TopicId.of(Long.parseLong(topicIdStr));
+        } catch (NumberFormatException e) {
+            return TopicId.of(topicIdStr);
+        }
+    }
+
+    /**
+     * Fetches the latest offsets available for the specified partitions.
+     * Used by Pinot for ingestion delay tracking.
+     * Note: This method is required by Pinot runtime but may not be in 
compile-time interface.
+     *
+     * @param partitions set of partition IDs to fetch offsets for
+     * @param timeoutMillis timeout for the operation
+     * @return map of partition IDs to their latest offsets
+     */
+    public Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitions, long timeoutMillis) {
+        Map<Integer, StreamPartitionMsgOffset> offsets = new HashMap<>();
+
+        try {
+            ensureConnected();
+
+            for (Integer partition : partitions) {
+                Partition partitionInfo = getPartitionInfo(partition);
+                long latestOffset = partitionInfo.messagesCount().longValue();
+                log.debug("Latest offset for partition {}: {}", partition, 
latestOffset);
+                offsets.put(partition, new 
IggyStreamPartitionMsgOffset(latestOffset));
+            }
+
+            return offsets;
+        } catch (RuntimeException e) {
+            log.error("Error fetching latest offsets: {}", e.getMessage(), e);
+            throw new RuntimeException("Failed to fetch latest offsets", e);
+        }
+    }
+
+    /**
+     * Indicates whether this stream supports offset lag tracking.
+     * Iggy supports offset lag since we can track current vs latest offset.
+     * Note: This method is required by Pinot runtime but may not be in 
compile-time interface.
+     *
+     * @return true if offset lag is supported
+     */
+    public boolean supportsOffsetLag() {

Review Comment:
   Same as above, please provide docs reference.



##########
foreign/java/external-processors/iggy-connector-pinot/build.gradle.kts:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+    id("iggy.java-library-conventions")
+}
+
+dependencies {
+    // Iggy SDK - use local project when building within Iggy repository
+    api(project(":iggy"))
+
+    // Apache Pinot dependencies (provided - not bundled with connector)
+    compileOnly("org.apache.pinot:pinot-spi:1.2.0")

Review Comment:
   Please add the version to the `libs.versions` file, so it's easier to manage 
it. Same with the Jackson v2 below



##########
foreign/java/external-processors/iggy-connector-pinot/integration-test.sh:
##########


Review Comment:
   Can this be implemented within Java with Testcontainers? This way the 
integration test would be automatically added to CI, and I believe they would 
be much easier to maintain than a bash script. 



##########
foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.metadata;
+
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.connector.pinot.consumer.IggyStreamPartitionMsgOffset;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.partition.Partition;
+import org.apache.iggy.topic.TopicDetails;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Metadata provider for Iggy streams.
+ * Provides information about partitions, offsets, and message counts.
+ *
+ * <p>This provider connects to Iggy via TCP to query:
+ * <ul>
+ *   <li>Number of partitions in a topic</li>
+ *   <li>Oldest available offset per partition</li>
+ *   <li>Latest offset per partition</li>
+ *   <li>Message counts</li>
+ * </ul>
+ */
+public class IggyStreamMetadataProvider implements StreamMetadataProvider {
+
+    private static final Logger log = 
LoggerFactory.getLogger(IggyStreamMetadataProvider.class);
+
+    private static final long DETAILS_CACHE_MS = 5000; // 5 seconds cache
+
+    private final String clientId;
+    private final IggyStreamConfig config;
+    private final Integer partitionId; // null for stream-level, non-null for 
partition-level
+
+    private AsyncIggyTcpClient asyncClient;
+    private StreamId streamId;
+    private TopicId topicId;
+    private TopicDetails cachedTopicDetails;
+    private long lastDetailsRefresh;
+
+    /**
+     * Creates a stream-level metadata provider (all partitions).
+     *
+     * @param clientId unique identifier
+     * @param config Iggy stream configuration
+     */
+    public IggyStreamMetadataProvider(String clientId, IggyStreamConfig 
config) {
+        this(clientId, config, null);
+    }
+
+    /**
+     * Creates a partition-level metadata provider.
+     *
+     * @param clientId unique identifier
+     * @param config Iggy stream configuration
+     * @param partitionId specific partition ID
+     */
+    public IggyStreamMetadataProvider(String clientId, IggyStreamConfig 
config, Integer partitionId) {
+        this.clientId = clientId;
+        this.config = config;
+        this.partitionId = partitionId;
+
+        log.info(
+                "Created IggyStreamMetadataProvider: clientId={}, 
partitionId={}, config={}",
+                clientId,
+                partitionId,
+                config);
+    }
+
+    /**
+     * Retrieves the number of partitions and their metadata.
+     * Called by Pinot to discover available partitions in the stream.
+     *
+     * @param timeoutMillis timeout for the operation
+     * @return number of partitions in the topic
+     */
+    @Override
+    public int fetchPartitionCount(long timeoutMillis) {
+        try {
+            ensureConnected();
+            TopicDetails topicDetails = fetchTopicDetails();
+            int partitionCount = topicDetails.partitionsCount().intValue();
+            log.info("Found {} partitions for topic {}", partitionCount, 
config.getTopicId());
+            return partitionCount;
+        } catch (RuntimeException e) {
+            log.error("Error fetching partition count: {}", e.getMessage(), e);
+            throw new RuntimeException("Failed to fetch partition count", e);
+        }
+    }
+
+    /**
+     * Fetches the current offset for consumption.
+     * For Iggy, we rely on consumer group state, so this returns the earliest 
offset.
+     *
+     * @param offsetCriteria offset criteria (earliest, latest, etc.)
+     * @param timeoutMillis timeout for the operation
+     * @return current offset for the partition
+     */
+    @Override
+    public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
+        try {
+            ensureConnected();
+
+            if (partitionId == null) {
+                throw new IllegalStateException("Partition ID must be set for 
offset queries");
+            }
+
+            Partition partition = getPartitionInfo(partitionId);
+
+            // Handle offset criteria
+            if (offsetCriteria != null && offsetCriteria.isSmallest()) {
+                // Return earliest available offset (0 for Iggy)
+                return new IggyStreamPartitionMsgOffset(0);
+            } else if (offsetCriteria != null && offsetCriteria.isLargest()) {
+                // Return latest offset based on messages count
+                long latestOffset = partition.messagesCount().longValue();
+                return new IggyStreamPartitionMsgOffset(latestOffset);
+            } else {
+                // Default to consumer group managed offset (start from 0)
+                return new IggyStreamPartitionMsgOffset(0);
+            }
+
+        } catch (RuntimeException e) {
+            log.error("Error fetching partition offset: {}", e.getMessage(), 
e);
+            throw new RuntimeException("Failed to fetch partition offset", e);
+        }
+    }
+
+    /**
+     * Ensures TCP connection to Iggy server is established.
+     */
+    private void ensureConnected() {
+        if (asyncClient == null) {
+            log.info("Connecting to Iggy server: {}", 
config.getServerAddress());
+
+            asyncClient = AsyncIggyTcpClient.builder()
+                    .host(config.getHost())
+                    .port(config.getPort())
+                    .credentials(config.getUsername(), config.getPassword())
+                    .connectionPoolSize(config.getConnectionPoolSize())
+                    .build();
+
+            // Connect and authenticate
+            asyncClient.connect().join();
+
+            // Parse stream and topic IDs
+            streamId = parseStreamId(config.getStreamId());
+            topicId = parseTopicId(config.getTopicId());
+
+            log.info("Connected to Iggy server successfully");
+        }
+    }
+
+    /**
+     * Fetches topic details with caching.
+     */
+    private TopicDetails fetchTopicDetails() {
+        long now = System.currentTimeMillis();
+        if (cachedTopicDetails == null || (now - lastDetailsRefresh) > 
DETAILS_CACHE_MS) {
+            try {
+                Optional<TopicDetails> details =
+                        asyncClient.topics().getTopicAsync(streamId, 
topicId).join();
+                cachedTopicDetails =
+                        details.orElseThrow(() -> new RuntimeException("Topic 
not found: " + config.getTopicId()));
+                lastDetailsRefresh = now;
+            } catch (RuntimeException e) {
+                log.error("Error fetching topic details: {}", e.getMessage(), 
e);
+                throw new RuntimeException("Failed to fetch topic details", e);
+            }
+        }
+        return cachedTopicDetails;
+    }
+
+    /**
+     * Gets information for a specific partition.
+     */
+    private Partition getPartitionInfo(int partitionId) {
+        TopicDetails details = fetchTopicDetails();
+        return details.partitions().stream()
+                .filter(p -> p.id().intValue() == partitionId)
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException("Partition " + 
partitionId + " not found"));
+    }
+
+    /**
+     * Parses stream ID from string (supports both numeric and named streams).
+     */
+    private StreamId parseStreamId(String streamIdStr) {
+        try {
+            return StreamId.of(Long.parseLong(streamIdStr));
+        } catch (NumberFormatException e) {
+            return StreamId.of(streamIdStr);
+        }
+    }
+
+    /**
+     * Parses topic ID from string (supports both numeric and named topics).
+     */
+    private TopicId parseTopicId(String topicIdStr) {
+        try {
+            return TopicId.of(Long.parseLong(topicIdStr));
+        } catch (NumberFormatException e) {
+            return TopicId.of(topicIdStr);
+        }
+    }
+
+    /**
+     * Fetches the latest offsets available for the specified partitions.
+     * Used by Pinot for ingestion delay tracking.
+     * Note: This method is required by Pinot runtime but may not be in 
compile-time interface.
+     *
+     * @param partitions set of partition IDs to fetch offsets for
+     * @param timeoutMillis timeout for the operation
+     * @return map of partition IDs to their latest offsets
+     */
+    public Map<Integer, StreamPartitionMsgOffset> 
fetchLatestStreamOffset(Set<Integer> partitions, long timeoutMillis) {

Review Comment:
   Can you please provide documentation reference where the requirement for 
this method is mentioned? If it is not part of the interface, then the javadoc 
should at least contain the link where we can validate the method's signature.



##########
.markdownlint.yml:
##########


Review Comment:
   Please do not disable the markdown lints, rather update the newly added 
files to align with these.



##########
foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.ConsumerId;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.PolledMessages;
+import org.apache.iggy.message.PollingStrategy;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Partition-level consumer implementation for Iggy streams.
+ * Reads messages from a single Iggy partition using the AsyncIggyTcpClient.
+ *
+ * <p>This consumer manages:
+ * <ul>
+ *   <li>TCP connection to Iggy server</li>
+ *   <li>Consumer group membership</li>
+ *   <li>Message polling with offset tracking</li>
+ *   <li>Automatic offset commit for consumer group</li>
+ * </ul>
+ */
+public class IggyPartitionGroupConsumer implements PartitionGroupConsumer {
+
+    private static final Logger log = 
LoggerFactory.getLogger(IggyPartitionGroupConsumer.class);
+
+    private final String clientId;
+    private final IggyStreamConfig config;
+    private final int partitionId;
+
+    private AsyncIggyTcpClient asyncClient;
+    private StreamId streamId;
+    private TopicId topicId;
+    private Consumer consumer;
+    private boolean consumerGroupJoined;
+    private long currentOffset;
+
+    /**
+     * Creates a new partition consumer.
+     *
+     * @param clientId unique identifier for this consumer
+     * @param config Iggy stream configuration
+     * @param partitionId the partition to consume from
+     */
+    public IggyPartitionGroupConsumer(String clientId, IggyStreamConfig 
config, int partitionId) {
+        this.clientId = clientId;
+        this.config = config;
+        this.partitionId = partitionId;
+        this.consumerGroupJoined = false;
+        this.currentOffset = 0;
+
+        log.info(
+                "Created IggyPartitionGroupConsumer: clientId={}, 
partition={}, config={}",
+                clientId,
+                partitionId,
+                config);
+    }
+
+    /**
+     * Fetches the next batch of messages from the Iggy partition.
+     * This method is called repeatedly by Pinot to poll for new messages.
+     *
+     * @param startOffset the offset to start consuming from (may be null)
+     * @param timeoutMillis timeout for the fetch operation
+     * @return batch of messages, or empty batch if no messages available
+     */
+    public MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, 
long timeoutMillis) {

Review Comment:
   this declaration does not match the interface, please fix and add 
`@Override` annotation



##########
foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.ConsumerId;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.PolledMessages;
+import org.apache.iggy.message.PollingStrategy;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Partition-level consumer implementation for Iggy streams.
+ * Reads messages from a single Iggy partition using the AsyncIggyTcpClient.
+ *
+ * <p>This consumer manages:
+ * <ul>
+ *   <li>TCP connection to Iggy server</li>
+ *   <li>Consumer group membership</li>
+ *   <li>Message polling with offset tracking</li>
+ *   <li>Automatic offset commit for consumer group</li>
+ * </ul>
+ */
+public class IggyPartitionGroupConsumer implements PartitionGroupConsumer {
+
+    private static final Logger log = 
LoggerFactory.getLogger(IggyPartitionGroupConsumer.class);
+
+    private final String clientId;
+    private final IggyStreamConfig config;
+    private final int partitionId;
+
+    private AsyncIggyTcpClient asyncClient;
+    private StreamId streamId;
+    private TopicId topicId;
+    private Consumer consumer;
+    private boolean consumerGroupJoined;
+    private long currentOffset;
+
+    /**
+     * Creates a new partition consumer.
+     *
+     * @param clientId unique identifier for this consumer
+     * @param config Iggy stream configuration
+     * @param partitionId the partition to consume from
+     */
+    public IggyPartitionGroupConsumer(String clientId, IggyStreamConfig 
config, int partitionId) {
+        this.clientId = clientId;
+        this.config = config;
+        this.partitionId = partitionId;
+        this.consumerGroupJoined = false;
+        this.currentOffset = 0;
+
+        log.info(
+                "Created IggyPartitionGroupConsumer: clientId={}, 
partition={}, config={}",
+                clientId,
+                partitionId,
+                config);
+    }
+
+    /**
+     * Fetches the next batch of messages from the Iggy partition.
+     * This method is called repeatedly by Pinot to poll for new messages.
+     *
+     * @param startOffset the offset to start consuming from (may be null)
+     * @param timeoutMillis timeout for the fetch operation
+     * @return batch of messages, or empty batch if no messages available
+     */
+    public MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, 
long timeoutMillis) {
+        try {
+            ensureConnected();
+
+            // No need to join consumer group when using single consumer
+
+            // Determine starting offset
+            long fetchOffset = determineStartOffset(startOffset);
+            log.debug("Fetching messages from partition {} at offset {}", 
partitionId, fetchOffset);
+
+            // Poll messages from Iggy
+            PolledMessages polledMessages = pollMessages(fetchOffset);
+            log.debug(
+                    "Polled {} messages from partition {}",
+                    polledMessages.messages().size(),
+                    partitionId);
+
+            // Convert to Pinot MessageBatch
+            MessageBatch batch = convertToMessageBatch(polledMessages);
+            return batch;
+
+        } catch (RuntimeException e) {
+            log.error("Error fetching messages from partition {}: {}", 
partitionId, e.getMessage(), e);
+            return new IggyMessageBatch(new ArrayList<>());
+        }
+    }
+
+    /**
+     * Ensures TCP connection to Iggy server is established.
+     */
+    private void ensureConnected() {
+        if (asyncClient == null) {
+            log.info("Connecting to Iggy server: {}", 
config.getServerAddress());
+
+            asyncClient = AsyncIggyTcpClient.builder()
+                    .host(config.getHost())
+                    .port(config.getPort())
+                    .credentials(config.getUsername(), config.getPassword())
+                    .connectionPoolSize(config.getConnectionPoolSize())
+                    .build();
+
+            // Connect and authenticate
+            asyncClient.connect().join();
+
+            // Parse stream and topic IDs
+            streamId = parseStreamId(config.getStreamId());
+            topicId = parseTopicId(config.getTopicId());
+            // Use single consumer instead of consumer group for explicit 
offset control
+            consumer = Consumer.of(ConsumerId.of(Long.valueOf(partitionId)));
+
+            log.info("Connected to Iggy server successfully");
+        }
+    }
+
+    /**
+     * Joins the consumer group for this partition.
+     * This operation is idempotent in Iggy.
+     */
+    private void joinConsumerGroup() {

Review Comment:
   As this code does not use consumer group, do we need this function? It's 
unused 



##########
foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.ConsumerId;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.PolledMessages;
+import org.apache.iggy.message.PollingStrategy;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Partition-level consumer implementation for Iggy streams.
+ * Reads messages from a single Iggy partition using the AsyncIggyTcpClient.
+ *
+ * <p>This consumer manages:
+ * <ul>
+ *   <li>TCP connection to Iggy server</li>
+ *   <li>Consumer group membership</li>
+ *   <li>Message polling with offset tracking</li>
+ *   <li>Automatic offset commit for consumer group</li>
+ * </ul>
+ */
+public class IggyPartitionGroupConsumer implements PartitionGroupConsumer {
+
+    private static final Logger log = 
LoggerFactory.getLogger(IggyPartitionGroupConsumer.class);
+
+    private final String clientId;

Review Comment:
   `clientId` and `consumerGroupJoined` are unused, please remove them if not 
needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to