mmodzelewski commented on code in PR #2499: URL: https://github.com/apache/iggy/pull/2499#discussion_r2639284988
########## foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.apache.iggy.connector.pinot.config.IggyStreamConfig; +import org.apache.iggy.connector.pinot.metadata.IggyStreamMetadataProvider; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionLevelConsumer; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamMetadataProvider; + +/** + * Factory for creating Iggy stream consumers and metadata providers. + * This is the main entry point for Pinot's stream ingestion framework to interact with Iggy. + * + * <p>Configuration in Pinot table config: + * <pre>{@code + * "streamConfigs": { + * "streamType": "iggy", + * "stream.iggy.consumer.factory.class.name": "org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory", + * "stream.iggy.host": "localhost", + * "stream.iggy.port": "8090", + * "stream.iggy.username": "iggy", + * "stream.iggy.password": "iggy", + * "stream.iggy.stream.id": "my-stream", + * "stream.iggy.topic.id": "my-topic", + * "stream.iggy.consumer.group": "pinot-consumer-group", + * "stream.iggy.poll.batch.size": "100" + * } + * }</pre> + */ +public class IggyConsumerFactory extends StreamConsumerFactory { + + private StreamConfig streamConfig; + + @Override + public void init(StreamConfig streamConfig) { + this.streamConfig = streamConfig; + } + + /** + * Creates a partition-level consumer for reading from a specific Iggy partition. + * Pinot calls this method for each partition that needs to be consumed. + * + * @param clientId unique identifier for this consumer instance + * @param groupId partition group identifier (partition ID in Iggy) + * @return a new partition consumer instance + */ + public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, int groupId) { Review Comment: This method does not match the interface declaration. The interface has this one, though: ```java public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) { ``` ########## foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.apache.iggy.connector.pinot.config.IggyStreamConfig; +import org.apache.iggy.connector.pinot.metadata.IggyStreamMetadataProvider; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionLevelConsumer; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamMetadataProvider; + +/** + * Factory for creating Iggy stream consumers and metadata providers. + * This is the main entry point for Pinot's stream ingestion framework to interact with Iggy. + * + * <p>Configuration in Pinot table config: + * <pre>{@code + * "streamConfigs": { + * "streamType": "iggy", + * "stream.iggy.consumer.factory.class.name": "org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory", + * "stream.iggy.host": "localhost", + * "stream.iggy.port": "8090", + * "stream.iggy.username": "iggy", + * "stream.iggy.password": "iggy", + * "stream.iggy.stream.id": "my-stream", + * "stream.iggy.topic.id": "my-topic", + * "stream.iggy.consumer.group": "pinot-consumer-group", + * "stream.iggy.poll.batch.size": "100" + * } + * }</pre> + */ +public class IggyConsumerFactory extends StreamConsumerFactory { + + private StreamConfig streamConfig; + + @Override + public void init(StreamConfig streamConfig) { + this.streamConfig = streamConfig; + } + + /** + * Creates a partition-level consumer for reading from a specific Iggy partition. + * Pinot calls this method for each partition that needs to be consumed. + * + * @param clientId unique identifier for this consumer instance + * @param groupId partition group identifier (partition ID in Iggy) + * @return a new partition consumer instance + */ + public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, int groupId) { + IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig); + return new IggyPartitionGroupConsumer(clientId, iggyConfig, groupId); + } + + /** + * Creates a partition-level consumer (newer Pinot API). + * Wraps the partition group consumer for compatibility. + * + * @param clientId unique identifier for this consumer instance + * @param partition partition identifier + * @return a new partition consumer instance + */ + @Override + public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) { Review Comment: This method is marked as deprecated in the interface, so I'm not sure if we need this part of the implementation. ########## foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/performance/PerformanceBenchmarkTest.java: ########## Review Comment: The `PerformanceBenchmarkTest` tests ArrayList/Long operations rather than actual connector behaviour (no network I/O, no Iggy server). The metrics in TEST_REPORT.md (e.g., "1.43M msg/sec") are based on these and could be misleading. I'd suggest removing both the test file and TEST_REPORT.md. ########## foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.metadata; + +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.connector.pinot.config.IggyStreamConfig; +import org.apache.iggy.connector.pinot.consumer.IggyStreamPartitionMsgOffset; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.partition.Partition; +import org.apache.iggy.topic.TopicDetails; +import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Metadata provider for Iggy streams. + * Provides information about partitions, offsets, and message counts. + * + * <p>This provider connects to Iggy via TCP to query: + * <ul> + * <li>Number of partitions in a topic</li> + * <li>Oldest available offset per partition</li> + * <li>Latest offset per partition</li> + * <li>Message counts</li> + * </ul> + */ +public class IggyStreamMetadataProvider implements StreamMetadataProvider { + + private static final Logger log = LoggerFactory.getLogger(IggyStreamMetadataProvider.class); + + private static final long DETAILS_CACHE_MS = 5000; // 5 seconds cache + + private final String clientId; + private final IggyStreamConfig config; + private final Integer partitionId; // null for stream-level, non-null for partition-level + + private AsyncIggyTcpClient asyncClient; + private StreamId streamId; + private TopicId topicId; + private TopicDetails cachedTopicDetails; + private long lastDetailsRefresh; + + /** + * Creates a stream-level metadata provider (all partitions). + * + * @param clientId unique identifier + * @param config Iggy stream configuration + */ + public IggyStreamMetadataProvider(String clientId, IggyStreamConfig config) { + this(clientId, config, null); + } + + /** + * Creates a partition-level metadata provider. + * + * @param clientId unique identifier + * @param config Iggy stream configuration + * @param partitionId specific partition ID + */ + public IggyStreamMetadataProvider(String clientId, IggyStreamConfig config, Integer partitionId) { + this.clientId = clientId; + this.config = config; + this.partitionId = partitionId; + + log.info( + "Created IggyStreamMetadataProvider: clientId={}, partitionId={}, config={}", + clientId, + partitionId, + config); + } + + /** + * Retrieves the number of partitions and their metadata. + * Called by Pinot to discover available partitions in the stream. + * + * @param timeoutMillis timeout for the operation + * @return number of partitions in the topic + */ + @Override + public int fetchPartitionCount(long timeoutMillis) { + try { + ensureConnected(); + TopicDetails topicDetails = fetchTopicDetails(); + int partitionCount = topicDetails.partitionsCount().intValue(); + log.info("Found {} partitions for topic {}", partitionCount, config.getTopicId()); + return partitionCount; + } catch (RuntimeException e) { + log.error("Error fetching partition count: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch partition count", e); + } + } + + /** + * Fetches the current offset for consumption. + * For Iggy, we rely on consumer group state, so this returns the earliest offset. + * + * @param offsetCriteria offset criteria (earliest, latest, etc.) + * @param timeoutMillis timeout for the operation + * @return current offset for the partition + */ + @Override + public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { + try { + ensureConnected(); + + if (partitionId == null) { + throw new IllegalStateException("Partition ID must be set for offset queries"); + } + + Partition partition = getPartitionInfo(partitionId); + + // Handle offset criteria + if (offsetCriteria != null && offsetCriteria.isSmallest()) { + // Return earliest available offset (0 for Iggy) + return new IggyStreamPartitionMsgOffset(0); + } else if (offsetCriteria != null && offsetCriteria.isLargest()) { + // Return latest offset based on messages count + long latestOffset = partition.messagesCount().longValue(); + return new IggyStreamPartitionMsgOffset(latestOffset); + } else { + // Default to consumer group managed offset (start from 0) + return new IggyStreamPartitionMsgOffset(0); + } + + } catch (RuntimeException e) { + log.error("Error fetching partition offset: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch partition offset", e); + } + } + + /** + * Ensures TCP connection to Iggy server is established. + */ + private void ensureConnected() { + if (asyncClient == null) { + log.info("Connecting to Iggy server: {}", config.getServerAddress()); + + asyncClient = AsyncIggyTcpClient.builder() + .host(config.getHost()) + .port(config.getPort()) + .credentials(config.getUsername(), config.getPassword()) + .connectionPoolSize(config.getConnectionPoolSize()) + .build(); + + // Connect and authenticate + asyncClient.connect().join(); + + // Parse stream and topic IDs + streamId = parseStreamId(config.getStreamId()); + topicId = parseTopicId(config.getTopicId()); + + log.info("Connected to Iggy server successfully"); + } + } + + /** + * Fetches topic details with caching. + */ + private TopicDetails fetchTopicDetails() { + long now = System.currentTimeMillis(); + if (cachedTopicDetails == null || (now - lastDetailsRefresh) > DETAILS_CACHE_MS) { + try { + Optional<TopicDetails> details = + asyncClient.topics().getTopicAsync(streamId, topicId).join(); + cachedTopicDetails = + details.orElseThrow(() -> new RuntimeException("Topic not found: " + config.getTopicId())); + lastDetailsRefresh = now; + } catch (RuntimeException e) { + log.error("Error fetching topic details: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch topic details", e); + } + } + return cachedTopicDetails; + } + + /** + * Gets information for a specific partition. + */ + private Partition getPartitionInfo(int partitionId) { + TopicDetails details = fetchTopicDetails(); + return details.partitions().stream() + .filter(p -> p.id().intValue() == partitionId) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Partition " + partitionId + " not found")); + } + + /** + * Parses stream ID from string (supports both numeric and named streams). + */ + private StreamId parseStreamId(String streamIdStr) { + try { + return StreamId.of(Long.parseLong(streamIdStr)); + } catch (NumberFormatException e) { + return StreamId.of(streamIdStr); + } + } + + /** + * Parses topic ID from string (supports both numeric and named topics). + */ + private TopicId parseTopicId(String topicIdStr) { + try { + return TopicId.of(Long.parseLong(topicIdStr)); + } catch (NumberFormatException e) { + return TopicId.of(topicIdStr); + } + } + + /** + * Fetches the latest offsets available for the specified partitions. + * Used by Pinot for ingestion delay tracking. + * Note: This method is required by Pinot runtime but may not be in compile-time interface. + * + * @param partitions set of partition IDs to fetch offsets for + * @param timeoutMillis timeout for the operation + * @return map of partition IDs to their latest offsets + */ + public Map<Integer, StreamPartitionMsgOffset> fetchLatestStreamOffset(Set<Integer> partitions, long timeoutMillis) { + Map<Integer, StreamPartitionMsgOffset> offsets = new HashMap<>(); + + try { + ensureConnected(); + + for (Integer partition : partitions) { + Partition partitionInfo = getPartitionInfo(partition); + long latestOffset = partitionInfo.messagesCount().longValue(); + log.debug("Latest offset for partition {}: {}", partition, latestOffset); + offsets.put(partition, new IggyStreamPartitionMsgOffset(latestOffset)); + } + + return offsets; + } catch (RuntimeException e) { + log.error("Error fetching latest offsets: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch latest offsets", e); + } + } + + /** + * Indicates whether this stream supports offset lag tracking. + * Iggy supports offset lag since we can track current vs latest offset. + * Note: This method is required by Pinot runtime but may not be in compile-time interface. + * + * @return true if offset lag is supported + */ + public boolean supportsOffsetLag() { + return true; + } + + @Override + public void close() throws IOException { Review Comment: this method does not throw IOException, so it can be removed from signature ########## foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatch.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageMetadata; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; + +import java.util.List; + +/** + * Implementation of Pinot's MessageBatch for Iggy messages. + * Wraps a list of messages with their offsets for consumption by Pinot. + */ +public class IggyMessageBatch implements MessageBatch<byte[]> { + + private final List<IggyMessageAndOffset> messages; + private int currentIndex = 0; Review Comment: `currentIndex` is not used anywhere. If that's correct, then please remove it. ########## foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.metadata; + +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.connector.pinot.config.IggyStreamConfig; +import org.apache.iggy.connector.pinot.consumer.IggyStreamPartitionMsgOffset; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.partition.Partition; +import org.apache.iggy.topic.TopicDetails; +import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Metadata provider for Iggy streams. + * Provides information about partitions, offsets, and message counts. + * + * <p>This provider connects to Iggy via TCP to query: + * <ul> + * <li>Number of partitions in a topic</li> + * <li>Oldest available offset per partition</li> + * <li>Latest offset per partition</li> + * <li>Message counts</li> + * </ul> + */ +public class IggyStreamMetadataProvider implements StreamMetadataProvider { + + private static final Logger log = LoggerFactory.getLogger(IggyStreamMetadataProvider.class); + + private static final long DETAILS_CACHE_MS = 5000; // 5 seconds cache + + private final String clientId; + private final IggyStreamConfig config; + private final Integer partitionId; // null for stream-level, non-null for partition-level + + private AsyncIggyTcpClient asyncClient; + private StreamId streamId; + private TopicId topicId; + private TopicDetails cachedTopicDetails; + private long lastDetailsRefresh; + + /** + * Creates a stream-level metadata provider (all partitions). + * + * @param clientId unique identifier + * @param config Iggy stream configuration + */ + public IggyStreamMetadataProvider(String clientId, IggyStreamConfig config) { + this(clientId, config, null); + } + + /** + * Creates a partition-level metadata provider. + * + * @param clientId unique identifier + * @param config Iggy stream configuration + * @param partitionId specific partition ID + */ + public IggyStreamMetadataProvider(String clientId, IggyStreamConfig config, Integer partitionId) { + this.clientId = clientId; + this.config = config; + this.partitionId = partitionId; + + log.info( + "Created IggyStreamMetadataProvider: clientId={}, partitionId={}, config={}", + clientId, + partitionId, + config); + } + + /** + * Retrieves the number of partitions and their metadata. + * Called by Pinot to discover available partitions in the stream. + * + * @param timeoutMillis timeout for the operation + * @return number of partitions in the topic + */ + @Override + public int fetchPartitionCount(long timeoutMillis) { + try { + ensureConnected(); + TopicDetails topicDetails = fetchTopicDetails(); + int partitionCount = topicDetails.partitionsCount().intValue(); + log.info("Found {} partitions for topic {}", partitionCount, config.getTopicId()); + return partitionCount; + } catch (RuntimeException e) { + log.error("Error fetching partition count: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch partition count", e); + } + } + + /** + * Fetches the current offset for consumption. + * For Iggy, we rely on consumer group state, so this returns the earliest offset. + * + * @param offsetCriteria offset criteria (earliest, latest, etc.) + * @param timeoutMillis timeout for the operation + * @return current offset for the partition + */ + @Override + public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { + try { + ensureConnected(); + + if (partitionId == null) { + throw new IllegalStateException("Partition ID must be set for offset queries"); + } + + Partition partition = getPartitionInfo(partitionId); + + // Handle offset criteria + if (offsetCriteria != null && offsetCriteria.isSmallest()) { + // Return earliest available offset (0 for Iggy) + return new IggyStreamPartitionMsgOffset(0); + } else if (offsetCriteria != null && offsetCriteria.isLargest()) { + // Return latest offset based on messages count + long latestOffset = partition.messagesCount().longValue(); + return new IggyStreamPartitionMsgOffset(latestOffset); + } else { + // Default to consumer group managed offset (start from 0) + return new IggyStreamPartitionMsgOffset(0); + } + + } catch (RuntimeException e) { + log.error("Error fetching partition offset: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch partition offset", e); + } + } + + /** + * Ensures TCP connection to Iggy server is established. + */ + private void ensureConnected() { + if (asyncClient == null) { + log.info("Connecting to Iggy server: {}", config.getServerAddress()); + + asyncClient = AsyncIggyTcpClient.builder() + .host(config.getHost()) + .port(config.getPort()) + .credentials(config.getUsername(), config.getPassword()) + .connectionPoolSize(config.getConnectionPoolSize()) + .build(); + + // Connect and authenticate + asyncClient.connect().join(); + + // Parse stream and topic IDs + streamId = parseStreamId(config.getStreamId()); + topicId = parseTopicId(config.getTopicId()); + + log.info("Connected to Iggy server successfully"); + } + } + + /** + * Fetches topic details with caching. + */ + private TopicDetails fetchTopicDetails() { + long now = System.currentTimeMillis(); + if (cachedTopicDetails == null || (now - lastDetailsRefresh) > DETAILS_CACHE_MS) { + try { + Optional<TopicDetails> details = + asyncClient.topics().getTopicAsync(streamId, topicId).join(); + cachedTopicDetails = + details.orElseThrow(() -> new RuntimeException("Topic not found: " + config.getTopicId())); + lastDetailsRefresh = now; + } catch (RuntimeException e) { + log.error("Error fetching topic details: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch topic details", e); + } + } + return cachedTopicDetails; + } + + /** + * Gets information for a specific partition. + */ + private Partition getPartitionInfo(int partitionId) { + TopicDetails details = fetchTopicDetails(); + return details.partitions().stream() + .filter(p -> p.id().intValue() == partitionId) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Partition " + partitionId + " not found")); + } + + /** + * Parses stream ID from string (supports both numeric and named streams). + */ + private StreamId parseStreamId(String streamIdStr) { + try { + return StreamId.of(Long.parseLong(streamIdStr)); + } catch (NumberFormatException e) { + return StreamId.of(streamIdStr); + } + } + + /** + * Parses topic ID from string (supports both numeric and named topics). + */ + private TopicId parseTopicId(String topicIdStr) { + try { + return TopicId.of(Long.parseLong(topicIdStr)); + } catch (NumberFormatException e) { + return TopicId.of(topicIdStr); + } + } + + /** + * Fetches the latest offsets available for the specified partitions. + * Used by Pinot for ingestion delay tracking. + * Note: This method is required by Pinot runtime but may not be in compile-time interface. + * + * @param partitions set of partition IDs to fetch offsets for + * @param timeoutMillis timeout for the operation + * @return map of partition IDs to their latest offsets + */ + public Map<Integer, StreamPartitionMsgOffset> fetchLatestStreamOffset(Set<Integer> partitions, long timeoutMillis) { + Map<Integer, StreamPartitionMsgOffset> offsets = new HashMap<>(); + + try { + ensureConnected(); + + for (Integer partition : partitions) { + Partition partitionInfo = getPartitionInfo(partition); + long latestOffset = partitionInfo.messagesCount().longValue(); + log.debug("Latest offset for partition {}: {}", partition, latestOffset); + offsets.put(partition, new IggyStreamPartitionMsgOffset(latestOffset)); + } + + return offsets; + } catch (RuntimeException e) { + log.error("Error fetching latest offsets: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch latest offsets", e); + } + } + + /** + * Indicates whether this stream supports offset lag tracking. + * Iggy supports offset lag since we can track current vs latest offset. + * Note: This method is required by Pinot runtime but may not be in compile-time interface. + * + * @return true if offset lag is supported + */ + public boolean supportsOffsetLag() { Review Comment: Same as above, please provide docs reference. ########## foreign/java/external-processors/iggy-connector-pinot/build.gradle.kts: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("iggy.java-library-conventions") +} + +dependencies { + // Iggy SDK - use local project when building within Iggy repository + api(project(":iggy")) + + // Apache Pinot dependencies (provided - not bundled with connector) + compileOnly("org.apache.pinot:pinot-spi:1.2.0") Review Comment: Please add the version to the `libs.versions` file, so it's easier to manage it. Same with the Jackson v2 below ########## foreign/java/external-processors/iggy-connector-pinot/integration-test.sh: ########## Review Comment: Can this be implemented within Java with Testcontainers? This way the integration test would be automatically added to CI, and I believe they would be much easier to maintain than a bash script. ########## foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.metadata; + +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.connector.pinot.config.IggyStreamConfig; +import org.apache.iggy.connector.pinot.consumer.IggyStreamPartitionMsgOffset; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.partition.Partition; +import org.apache.iggy.topic.TopicDetails; +import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Metadata provider for Iggy streams. + * Provides information about partitions, offsets, and message counts. + * + * <p>This provider connects to Iggy via TCP to query: + * <ul> + * <li>Number of partitions in a topic</li> + * <li>Oldest available offset per partition</li> + * <li>Latest offset per partition</li> + * <li>Message counts</li> + * </ul> + */ +public class IggyStreamMetadataProvider implements StreamMetadataProvider { + + private static final Logger log = LoggerFactory.getLogger(IggyStreamMetadataProvider.class); + + private static final long DETAILS_CACHE_MS = 5000; // 5 seconds cache + + private final String clientId; + private final IggyStreamConfig config; + private final Integer partitionId; // null for stream-level, non-null for partition-level + + private AsyncIggyTcpClient asyncClient; + private StreamId streamId; + private TopicId topicId; + private TopicDetails cachedTopicDetails; + private long lastDetailsRefresh; + + /** + * Creates a stream-level metadata provider (all partitions). + * + * @param clientId unique identifier + * @param config Iggy stream configuration + */ + public IggyStreamMetadataProvider(String clientId, IggyStreamConfig config) { + this(clientId, config, null); + } + + /** + * Creates a partition-level metadata provider. + * + * @param clientId unique identifier + * @param config Iggy stream configuration + * @param partitionId specific partition ID + */ + public IggyStreamMetadataProvider(String clientId, IggyStreamConfig config, Integer partitionId) { + this.clientId = clientId; + this.config = config; + this.partitionId = partitionId; + + log.info( + "Created IggyStreamMetadataProvider: clientId={}, partitionId={}, config={}", + clientId, + partitionId, + config); + } + + /** + * Retrieves the number of partitions and their metadata. + * Called by Pinot to discover available partitions in the stream. + * + * @param timeoutMillis timeout for the operation + * @return number of partitions in the topic + */ + @Override + public int fetchPartitionCount(long timeoutMillis) { + try { + ensureConnected(); + TopicDetails topicDetails = fetchTopicDetails(); + int partitionCount = topicDetails.partitionsCount().intValue(); + log.info("Found {} partitions for topic {}", partitionCount, config.getTopicId()); + return partitionCount; + } catch (RuntimeException e) { + log.error("Error fetching partition count: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch partition count", e); + } + } + + /** + * Fetches the current offset for consumption. + * For Iggy, we rely on consumer group state, so this returns the earliest offset. + * + * @param offsetCriteria offset criteria (earliest, latest, etc.) + * @param timeoutMillis timeout for the operation + * @return current offset for the partition + */ + @Override + public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { + try { + ensureConnected(); + + if (partitionId == null) { + throw new IllegalStateException("Partition ID must be set for offset queries"); + } + + Partition partition = getPartitionInfo(partitionId); + + // Handle offset criteria + if (offsetCriteria != null && offsetCriteria.isSmallest()) { + // Return earliest available offset (0 for Iggy) + return new IggyStreamPartitionMsgOffset(0); + } else if (offsetCriteria != null && offsetCriteria.isLargest()) { + // Return latest offset based on messages count + long latestOffset = partition.messagesCount().longValue(); + return new IggyStreamPartitionMsgOffset(latestOffset); + } else { + // Default to consumer group managed offset (start from 0) + return new IggyStreamPartitionMsgOffset(0); + } + + } catch (RuntimeException e) { + log.error("Error fetching partition offset: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch partition offset", e); + } + } + + /** + * Ensures TCP connection to Iggy server is established. + */ + private void ensureConnected() { + if (asyncClient == null) { + log.info("Connecting to Iggy server: {}", config.getServerAddress()); + + asyncClient = AsyncIggyTcpClient.builder() + .host(config.getHost()) + .port(config.getPort()) + .credentials(config.getUsername(), config.getPassword()) + .connectionPoolSize(config.getConnectionPoolSize()) + .build(); + + // Connect and authenticate + asyncClient.connect().join(); + + // Parse stream and topic IDs + streamId = parseStreamId(config.getStreamId()); + topicId = parseTopicId(config.getTopicId()); + + log.info("Connected to Iggy server successfully"); + } + } + + /** + * Fetches topic details with caching. + */ + private TopicDetails fetchTopicDetails() { + long now = System.currentTimeMillis(); + if (cachedTopicDetails == null || (now - lastDetailsRefresh) > DETAILS_CACHE_MS) { + try { + Optional<TopicDetails> details = + asyncClient.topics().getTopicAsync(streamId, topicId).join(); + cachedTopicDetails = + details.orElseThrow(() -> new RuntimeException("Topic not found: " + config.getTopicId())); + lastDetailsRefresh = now; + } catch (RuntimeException e) { + log.error("Error fetching topic details: {}", e.getMessage(), e); + throw new RuntimeException("Failed to fetch topic details", e); + } + } + return cachedTopicDetails; + } + + /** + * Gets information for a specific partition. + */ + private Partition getPartitionInfo(int partitionId) { + TopicDetails details = fetchTopicDetails(); + return details.partitions().stream() + .filter(p -> p.id().intValue() == partitionId) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Partition " + partitionId + " not found")); + } + + /** + * Parses stream ID from string (supports both numeric and named streams). + */ + private StreamId parseStreamId(String streamIdStr) { + try { + return StreamId.of(Long.parseLong(streamIdStr)); + } catch (NumberFormatException e) { + return StreamId.of(streamIdStr); + } + } + + /** + * Parses topic ID from string (supports both numeric and named topics). + */ + private TopicId parseTopicId(String topicIdStr) { + try { + return TopicId.of(Long.parseLong(topicIdStr)); + } catch (NumberFormatException e) { + return TopicId.of(topicIdStr); + } + } + + /** + * Fetches the latest offsets available for the specified partitions. + * Used by Pinot for ingestion delay tracking. + * Note: This method is required by Pinot runtime but may not be in compile-time interface. + * + * @param partitions set of partition IDs to fetch offsets for + * @param timeoutMillis timeout for the operation + * @return map of partition IDs to their latest offsets + */ + public Map<Integer, StreamPartitionMsgOffset> fetchLatestStreamOffset(Set<Integer> partitions, long timeoutMillis) { Review Comment: Can you please provide documentation reference where the requirement for this method is mentioned? If it is not part of the interface, then the javadoc should at least contain the link where we can validate the method's signature. ########## .markdownlint.yml: ########## Review Comment: Please do not disable the markdown lints, rather update the newly added files to align with these. ########## foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.connector.pinot.config.IggyStreamConfig; +import org.apache.iggy.consumergroup.Consumer; +import org.apache.iggy.identifier.ConsumerId; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.message.Message; +import org.apache.iggy.message.PolledMessages; +import org.apache.iggy.message.PollingStrategy; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Partition-level consumer implementation for Iggy streams. + * Reads messages from a single Iggy partition using the AsyncIggyTcpClient. + * + * <p>This consumer manages: + * <ul> + * <li>TCP connection to Iggy server</li> + * <li>Consumer group membership</li> + * <li>Message polling with offset tracking</li> + * <li>Automatic offset commit for consumer group</li> + * </ul> + */ +public class IggyPartitionGroupConsumer implements PartitionGroupConsumer { + + private static final Logger log = LoggerFactory.getLogger(IggyPartitionGroupConsumer.class); + + private final String clientId; + private final IggyStreamConfig config; + private final int partitionId; + + private AsyncIggyTcpClient asyncClient; + private StreamId streamId; + private TopicId topicId; + private Consumer consumer; + private boolean consumerGroupJoined; + private long currentOffset; + + /** + * Creates a new partition consumer. + * + * @param clientId unique identifier for this consumer + * @param config Iggy stream configuration + * @param partitionId the partition to consume from + */ + public IggyPartitionGroupConsumer(String clientId, IggyStreamConfig config, int partitionId) { + this.clientId = clientId; + this.config = config; + this.partitionId = partitionId; + this.consumerGroupJoined = false; + this.currentOffset = 0; + + log.info( + "Created IggyPartitionGroupConsumer: clientId={}, partition={}, config={}", + clientId, + partitionId, + config); + } + + /** + * Fetches the next batch of messages from the Iggy partition. + * This method is called repeatedly by Pinot to poll for new messages. + * + * @param startOffset the offset to start consuming from (may be null) + * @param timeoutMillis timeout for the fetch operation + * @return batch of messages, or empty batch if no messages available + */ + public MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, long timeoutMillis) { Review Comment: this declaration does not match the interface, please fix and add `@Override` annotation ########## foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.connector.pinot.config.IggyStreamConfig; +import org.apache.iggy.consumergroup.Consumer; +import org.apache.iggy.identifier.ConsumerId; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.message.Message; +import org.apache.iggy.message.PolledMessages; +import org.apache.iggy.message.PollingStrategy; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Partition-level consumer implementation for Iggy streams. + * Reads messages from a single Iggy partition using the AsyncIggyTcpClient. + * + * <p>This consumer manages: + * <ul> + * <li>TCP connection to Iggy server</li> + * <li>Consumer group membership</li> + * <li>Message polling with offset tracking</li> + * <li>Automatic offset commit for consumer group</li> + * </ul> + */ +public class IggyPartitionGroupConsumer implements PartitionGroupConsumer { + + private static final Logger log = LoggerFactory.getLogger(IggyPartitionGroupConsumer.class); + + private final String clientId; + private final IggyStreamConfig config; + private final int partitionId; + + private AsyncIggyTcpClient asyncClient; + private StreamId streamId; + private TopicId topicId; + private Consumer consumer; + private boolean consumerGroupJoined; + private long currentOffset; + + /** + * Creates a new partition consumer. + * + * @param clientId unique identifier for this consumer + * @param config Iggy stream configuration + * @param partitionId the partition to consume from + */ + public IggyPartitionGroupConsumer(String clientId, IggyStreamConfig config, int partitionId) { + this.clientId = clientId; + this.config = config; + this.partitionId = partitionId; + this.consumerGroupJoined = false; + this.currentOffset = 0; + + log.info( + "Created IggyPartitionGroupConsumer: clientId={}, partition={}, config={}", + clientId, + partitionId, + config); + } + + /** + * Fetches the next batch of messages from the Iggy partition. + * This method is called repeatedly by Pinot to poll for new messages. + * + * @param startOffset the offset to start consuming from (may be null) + * @param timeoutMillis timeout for the fetch operation + * @return batch of messages, or empty batch if no messages available + */ + public MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, long timeoutMillis) { + try { + ensureConnected(); + + // No need to join consumer group when using single consumer + + // Determine starting offset + long fetchOffset = determineStartOffset(startOffset); + log.debug("Fetching messages from partition {} at offset {}", partitionId, fetchOffset); + + // Poll messages from Iggy + PolledMessages polledMessages = pollMessages(fetchOffset); + log.debug( + "Polled {} messages from partition {}", + polledMessages.messages().size(), + partitionId); + + // Convert to Pinot MessageBatch + MessageBatch batch = convertToMessageBatch(polledMessages); + return batch; + + } catch (RuntimeException e) { + log.error("Error fetching messages from partition {}: {}", partitionId, e.getMessage(), e); + return new IggyMessageBatch(new ArrayList<>()); + } + } + + /** + * Ensures TCP connection to Iggy server is established. + */ + private void ensureConnected() { + if (asyncClient == null) { + log.info("Connecting to Iggy server: {}", config.getServerAddress()); + + asyncClient = AsyncIggyTcpClient.builder() + .host(config.getHost()) + .port(config.getPort()) + .credentials(config.getUsername(), config.getPassword()) + .connectionPoolSize(config.getConnectionPoolSize()) + .build(); + + // Connect and authenticate + asyncClient.connect().join(); + + // Parse stream and topic IDs + streamId = parseStreamId(config.getStreamId()); + topicId = parseTopicId(config.getTopicId()); + // Use single consumer instead of consumer group for explicit offset control + consumer = Consumer.of(ConsumerId.of(Long.valueOf(partitionId))); + + log.info("Connected to Iggy server successfully"); + } + } + + /** + * Joins the consumer group for this partition. + * This operation is idempotent in Iggy. + */ + private void joinConsumerGroup() { Review Comment: As this code does not use consumer group, do we need this function? It's unused ########## foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.pinot.consumer; + +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.connector.pinot.config.IggyStreamConfig; +import org.apache.iggy.consumergroup.Consumer; +import org.apache.iggy.identifier.ConsumerId; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.message.Message; +import org.apache.iggy.message.PolledMessages; +import org.apache.iggy.message.PollingStrategy; +import org.apache.pinot.spi.stream.MessageBatch; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Partition-level consumer implementation for Iggy streams. + * Reads messages from a single Iggy partition using the AsyncIggyTcpClient. + * + * <p>This consumer manages: + * <ul> + * <li>TCP connection to Iggy server</li> + * <li>Consumer group membership</li> + * <li>Message polling with offset tracking</li> + * <li>Automatic offset commit for consumer group</li> + * </ul> + */ +public class IggyPartitionGroupConsumer implements PartitionGroupConsumer { + + private static final Logger log = LoggerFactory.getLogger(IggyPartitionGroupConsumer.class); + + private final String clientId; Review Comment: `clientId` and `consumerGroupJoined` are unused, please remove them if not needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
