mmodzelewski commented on code in PR #2300:
URL: https://github.com/apache/iggy/pull/2300#discussion_r2491720866


##########
foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/source/IggyPartitionSplitReader.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.flink.source;
+
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.PolledMessages;
+import org.apache.iggy.message.PollingStrategy;
+import org.apache.iggy.connector.error.ConnectorException;
+import org.apache.iggy.connector.serialization.DeserializationSchema;
+import org.apache.iggy.connector.serialization.RecordMetadata;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reads messages from a single Iggy partition.
+ * Manages offset tracking and message deserialization.
+ */
+public class IggyPartitionSplitReader<T> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IggyPartitionSplitReader.class);
+
+    private final AsyncIggyTcpClient asyncClient;
+    private final IggySourceSplit split;
+    private final DeserializationSchema<T> deserializer;
+    private final Consumer consumer;
+    private final long pollBatchSize;
+
+    private long currentOffset;
+    private boolean hasMoreData;
+    private boolean consumerGroupJoined;
+
+    /**
+     * Creates a new partition split reader.
+     *
+     * @param asyncClient the async Iggy client
+     * @param split the source split to read from
+     * @param deserializer the deserialization schema
+     * @param consumer the consumer identifier
+     * @param pollBatchSize number of messages to fetch per poll
+     */
+    public IggyPartitionSplitReader(
+            AsyncIggyTcpClient asyncClient,
+            IggySourceSplit split,
+            DeserializationSchema<T> deserializer,
+            Consumer consumer,
+            long pollBatchSize) {
+
+        if (asyncClient == null) {
+            throw new IllegalArgumentException("asyncClient cannot be null");
+        }
+        if (split == null) {
+            throw new IllegalArgumentException("split cannot be null");
+        }
+        if (deserializer == null) {
+            throw new IllegalArgumentException("deserializer cannot be null");
+        }
+        if (consumer == null) {
+            throw new IllegalArgumentException("consumer cannot be null");
+        }
+        if (pollBatchSize <= 0) {
+            throw new IllegalArgumentException("pollBatchSize must be > 0");
+        }
+
+        this.asyncClient = asyncClient;
+        this.split = split;
+        this.deserializer = deserializer;
+        this.consumer = consumer;
+        this.pollBatchSize = pollBatchSize;
+        this.currentOffset = split.getCurrentOffset();
+        this.hasMoreData = true;
+        this.consumerGroupJoined = false;
+    }
+
+    /**
+     * Polls messages from the partition.
+     *
+     * @return list of deserialized records, empty if no messages available
+     * @throws ConnectorException if polling or deserialization fails
+     */
+    public List<T> poll() {
+        // For unbounded streams, always poll (don't check hasMoreData)
+
+        try {
+            StreamId streamId = parseStreamId(split.getStreamId());
+            TopicId topicId = parseTopicId(split.getTopicId());
+
+            // Join consumer group on first poll (idempotent operation in Iggy)
+            if (!consumerGroupJoined) {
+                LOGGER.info("IggyPartitionSplitReader: Joining consumer group 
for stream={}, "
+                                + "topic={}, consumer={}, partition={}",
+                        split.getStreamId(), split.getTopicId(), consumer.id(),
+                        split.getPartitionId());
+                asyncClient.consumerGroups()
+                        .joinConsumerGroup(streamId, topicId, consumer.id())
+                        .join();
+                consumerGroupJoined = true;
+                LOGGER.info("IggyPartitionSplitReader: Successfully joined 
consumer group");
+            }
+            Optional<Long> partitionId = Optional.of((long) 
split.getPartitionId());
+
+            // CRITICAL FIX: Use consumer group managed offset (next available 
message)

Review Comment:
   Not in place, but probably a test case would be best. We'd have a "living 
documentation" then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to