mmodzelewski commented on code in PR #2300: URL: https://github.com/apache/iggy/pull/2300#discussion_r2490106663
########## foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/serialization/JsonSerializationSchema.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.serialization; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +import java.io.IOException; +import java.util.Optional; +import java.util.function.Function; + +/** + * Serialization schema for JSON using Jackson. + * Supports Java 8 time types and optional partition key extraction. + * + * @param <T> the type to serialize from + */ +public class JsonSerializationSchema<T> implements SerializationSchema<T> { + + private static final long serialVersionUID = 1L; + + private final transient ObjectMapper objectMapper; + private final Function<T, Integer> partitionKeyExtractor; + + /** + * Creates a new JSON serializer. + */ + public JsonSerializationSchema() { + this(createDefaultObjectMapper(), null); + } + + /** + * Creates a new JSON serializer with partition key extractor. + * + * @param partitionKeyExtractor function to extract partition key from elements + */ + public JsonSerializationSchema(Function<T, Integer> partitionKeyExtractor) { + this(createDefaultObjectMapper(), partitionKeyExtractor); + } + + /** + * Creates a new JSON serializer with partition key extractor. + * Static factory method to avoid constructor ambiguity. + * + * @param partitionKeyExtractor function to extract partition key from elements + * @param <T> the type to serialize + * @return new JsonSerializationSchema instance + */ + public static <T> JsonSerializationSchema<T> withPartitionKeyExtractor( + Function<T, Integer> partitionKeyExtractor) { + return new JsonSerializationSchema<>(partitionKeyExtractor); + } + + /** + * Creates a new JSON serializer with a custom ObjectMapper. + * + * @param objectMapper the Jackson ObjectMapper to use + */ + public JsonSerializationSchema(ObjectMapper objectMapper) { + this(objectMapper, null); + } + + /** + * Creates a new JSON serializer with custom ObjectMapper and partition key extractor. + * + * @param objectMapper the Jackson ObjectMapper to use + * @param partitionKeyExtractor function to extract partition key from elements + */ + public JsonSerializationSchema( + ObjectMapper objectMapper, + Function<T, Integer> partitionKeyExtractor) { + if (objectMapper == null) { + throw new IllegalArgumentException("objectMapper cannot be null"); + } + this.objectMapper = objectMapper; + this.partitionKeyExtractor = partitionKeyExtractor; + } + + @Override + public byte[] serialize(T element) throws IOException { + if (element == null) { + return null; + } + + try { + return getObjectMapper().writeValueAsBytes(element); + } catch (IOException e) { + throw new IOException( + "Failed to serialize object of type " + element.getClass().getName(), e); + } + } + + @Override + public Optional<Integer> extractPartitionKey(T element) { + if (partitionKeyExtractor != null && element != null) { + try { + Integer key = partitionKeyExtractor.apply(element); + return Optional.ofNullable(key); + } catch (Exception e) { + // Log and return empty if extraction fails Review Comment: Please add the log statement. Also change the exception type to `RuntimeException`. `Exception` might silently catch a checked exception if the implementation changes in the future. ########## foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/sink/IggySink.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.flink.sink; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.iggy.client.blocking.http.IggyHttpClient; +import org.apache.iggy.connector.config.IggyConnectionConfig; +import org.apache.iggy.connector.serialization.SerializationSchema; + +import java.io.IOException; +import java.io.Serializable; +import java.time.Duration; + +/** + * Flink Sink implementation for writing to Iggy streams. + * Implements the Flink Sink V2 API for integration with DataStream API. + * + * <p>Example usage: + * <pre>{@code + * events.sinkTo( + * IggySink.<Event>builder() + * .setConnectionConfig(connectionConfig) + * .setStreamId("my-stream") + * .setTopicId("my-topic") + * .setSerializer(new JsonSerializationSchema<>()) + * .setBatchSize(100) + * .setFlushInterval(Duration.ofSeconds(5)) + * .withBalancedPartitioning() + * .build() + * ).name("Iggy Sink"); + * }</pre> + * + * @param <T> the type of records to write + */ +public class IggySink<T> implements Sink<T>, Serializable { + + private static final long serialVersionUID = 1L; + + private final IggyConnectionConfig connectionConfig; + private final String streamId; + private final String topicId; + private final SerializationSchema<T> serializer; + private final int batchSize; + private final Duration flushInterval; + private final IggySinkWriter.PartitioningStrategy partitioningStrategy; + + /** + * Creates a new Iggy sink. + * Use {@link #builder()} to construct instances. + * + * @param connectionConfig the connection configuration + * @param streamId the stream identifier + * @param topicId the topic identifier + * @param serializer the serialization schema + * @param batchSize the batch size for buffering + * @param flushInterval the maximum flush interval + * @param partitioningStrategy the partitioning strategy + */ + public IggySink( + IggyConnectionConfig connectionConfig, + String streamId, + String topicId, + SerializationSchema<T> serializer, + int batchSize, + Duration flushInterval, + IggySinkWriter.PartitioningStrategy partitioningStrategy) { + + this.connectionConfig = connectionConfig; + this.streamId = streamId; + this.topicId = topicId; + this.serializer = serializer; + this.batchSize = batchSize; + this.flushInterval = flushInterval; + this.partitioningStrategy = partitioningStrategy; + } + + /** + * Creates a new builder for configuring the sink. + * + * @param <T> the type of records to write + * @return a new builder instance + */ + public static <T> IggySinkBuilder<T> builder() { + return new IggySinkBuilder<>(); + } + + @Override + public SinkWriter<T> createWriter(WriterInitContext context) throws IOException { + IggyHttpClient httpClient = createHttpClient(); + return new IggySinkWriter<>( + httpClient, + streamId, + topicId, + serializer, + batchSize, + flushInterval, + partitioningStrategy); + } + + /** + * Creates an HTTP Iggy client based on connection configuration. + * + * @return configured HTTP Iggy client + */ + private IggyHttpClient createHttpClient() { + try { + // Build HTTP URL from server address + String serverAddress = connectionConfig.getServerAddress(); + String httpUrl; + + // If serverAddress already has http:// or https://, use it as is + if (serverAddress.startsWith("http://") || serverAddress.startsWith("https://")) { + httpUrl = serverAddress; + } else { + // Extract host and replace TCP port 8090 with HTTP port 3000 + String host; + if (serverAddress.contains(":")) { + // Extract just the host part (before the port) + host = serverAddress.substring(0, serverAddress.indexOf(":")); + } else { + host = serverAddress; + } + // HTTP server runs on port 3000 + httpUrl = "http://" + host + ":3000"; + } + + // Create HTTP client + IggyHttpClient httpClient = new IggyHttpClient(httpUrl); + + // Login + httpClient.users().login( + connectionConfig.getUsername(), + connectionConfig.getPassword() + ); + + return httpClient; + + } catch (Exception e) { Review Comment: Please catch `RuntimeException` and add specific handling for checked exceptions if necessary ########## foreign/java/external-processors/iggy-connector-flink/docker-compose.yml: ########## @@ -0,0 +1,273 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +services: + # Iggy Message Broker + iggy: + image: apache/iggy:edge + container_name: iggy-server + ports: + - "8080:8080" # QUIC transport + - "8090:8090" # TCP transport + - "3000:3000" # HTTP API / Web UI + environment: + - IGGY_TCP_ADDRESS=0.0.0.0:8090 + - IGGY_HTTP_ADDRESS=0.0.0.0:3000 + - IGGY_QUIC_ADDRESS=0.0.0.0:8080 + - IGGY_SYSTEM_LOGGING_LEVEL=info + - IGGY_ROOT_USERNAME=iggy + - IGGY_ROOT_PASSWORD=iggy + volumes: + - iggy-data:/local_data + networks: + - flink-iggy-network + + # Flink JobManager + jobmanager: + image: flink:2.1.0-scala_2.12-java21 + container_name: flink-jobmanager + ports: + - "8081:8081" # Web UI + - "6123:6123" # RPC + user: root # Start as root to fix permissions + entrypoint: /bin/bash + command: + - -c + - | + # Create checkpoint/savepoint directories with correct permissions Review Comment: this part is already included in the custom Dockerfile under `docker` dir, so maybe you could utilize compose build option, to remove the duplication from the compose file ########## foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/source/IggySourceSplitEnumerator.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.flink.source; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.connector.config.OffsetConfig; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.partition.Partition; +import org.apache.iggy.topic.TopicDetails; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.*; + +/** + * Split enumerator for Iggy source. + * Discovers partitions and assigns them to source readers. + */ +public class IggySourceSplitEnumerator implements + SplitEnumerator<IggySourceSplit, IggySourceEnumeratorState> { + + private final SplitEnumeratorContext<IggySourceSplit> context; + private final AsyncIggyTcpClient asyncClient; + private final String streamId; + private final String topicId; + private final String consumerGroupName; + private final OffsetConfig offsetConfig; + + private final Set<IggySourceSplit> assignedSplits; + private final Set<Integer> discoveredPartitions; + private final Set<String> sentToReaders; // Track splits already sent to readers + + /** + * Creates a new split enumerator. + * + * @param context the split enumerator context + * @param asyncClient the async Iggy client + * @param streamId the stream ID + * @param topicId the topic ID + * @param consumerGroupName the consumer group name + * @param offsetConfig the offset configuration + * @param initialState the initial state (for recovery) + */ + public IggySourceSplitEnumerator( + SplitEnumeratorContext<IggySourceSplit> context, + AsyncIggyTcpClient asyncClient, + String streamId, + String topicId, + String consumerGroupName, + OffsetConfig offsetConfig, + @Nullable IggySourceEnumeratorState initialState) { + + this.context = context; + this.asyncClient = asyncClient; + this.streamId = streamId; + this.topicId = topicId; + this.consumerGroupName = consumerGroupName; + this.offsetConfig = offsetConfig; + + if (initialState != null) { + this.assignedSplits = new HashSet<>(initialState.getAssignedSplits()); + this.discoveredPartitions = new HashSet<>(initialState.getDiscoveredPartitions()); + } else { + this.assignedSplits = new HashSet<>(); + this.discoveredPartitions = new HashSet<>(); + } + this.sentToReaders = new HashSet<>(); + } + + @Override + public void start() { + // Create consumer group if it doesn't exist + ensureConsumerGroupExists(); + + // Discover initial partitions + discoverPartitions(); + } + + /** + * Ensures the consumer group exists, creating it if necessary. + * + * Note: In Iggy SDK async API, consumer group creation is not available. + * Consumer groups must be created beforehand using the blocking HTTP client + * or through the setup scripts. The async API only supports joining/leaving + * consumer groups. + */ + private void ensureConsumerGroupExists() { Review Comment: This is private and empty, so can we remove it? If you want to communicate this with lib users, then it's better to have a validation for that and throw respective error. ########## foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/sink/IggySinkWriter.java: ########## @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.flink.sink; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.iggy.client.blocking.http.IggyHttpClient; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.message.Message; +import org.apache.iggy.message.MessageHeader; +import org.apache.iggy.message.MessageId; +import org.apache.iggy.message.Partitioning; +import org.apache.iggy.connector.error.ConnectorException; +import org.apache.iggy.connector.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigInteger; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +/** + * Sink writer implementation for writing records to Iggy. + * Buffers records and flushes them in batches for efficiency. + */ +public class IggySinkWriter<T> implements SinkWriter<T> { + + private static final Logger LOGGER = LoggerFactory.getLogger(IggySinkWriter.class); + + private final IggyHttpClient httpClient; + private final String streamId; + private final String topicId; + private final SerializationSchema<T> serializer; + private final int batchSize; + private final Duration flushInterval; + private final PartitioningStrategy partitioningStrategy; + + private final List<T> buffer; + private long lastFlushTime; + private long totalWritten; + + /** + * Strategy for determining partition assignment. + */ + public enum PartitioningStrategy { + BALANCED, + PARTITION_ID, + MESSAGE_KEY + } + + /** + * Creates a new sink writer. + * + * @param httpClient the HTTP Iggy client + * @param streamId the stream identifier + * @param topicId the topic identifier + * @param serializer the serialization schema + * @param batchSize maximum number of records to buffer before flushing + * @param flushInterval maximum time to wait before flushing + * @param partitioningStrategy the partitioning strategy + */ + public IggySinkWriter( + IggyHttpClient httpClient, + String streamId, + String topicId, + SerializationSchema<T> serializer, + int batchSize, + Duration flushInterval, + PartitioningStrategy partitioningStrategy) { + + if (httpClient == null) { + throw new IllegalArgumentException("httpClient cannot be null"); + } + if (streamId == null || streamId.isEmpty()) { + throw new IllegalArgumentException("streamId cannot be null or empty"); + } + if (topicId == null || topicId.isEmpty()) { + throw new IllegalArgumentException("topicId cannot be null or empty"); + } + if (serializer == null) { + throw new IllegalArgumentException("serializer cannot be null"); + } + if (batchSize <= 0) { + throw new IllegalArgumentException("batchSize must be > 0"); + } + if (flushInterval == null || flushInterval.isNegative()) { + throw new IllegalArgumentException("flushInterval must be positive"); + } + + this.httpClient = httpClient; + this.streamId = streamId; + this.topicId = topicId; + this.serializer = serializer; + this.batchSize = batchSize; + this.flushInterval = flushInterval; + this.partitioningStrategy = partitioningStrategy != null + ? partitioningStrategy + : PartitioningStrategy.BALANCED; + + this.buffer = new ArrayList<>(batchSize); + this.lastFlushTime = System.currentTimeMillis(); + this.totalWritten = 0; + } + + @Override + public void write(T element, Context context) throws IOException { + LOGGER.info("IggySinkWriter.write() called - element: {}, buffer size: {}", + element, buffer.size()); + buffer.add(element); + + // Flush if batch size reached or flush interval exceeded + if (buffer.size() >= batchSize || shouldFlushByTime()) { + LOGGER.info("IggySinkWriter: Flushing buffer of size {}", buffer.size()); + flush(false); + } + } + + @Override + public void flush(boolean endOfInput) throws IOException { + if (buffer.isEmpty()) { + LOGGER.debug("IggySinkWriter.flush() - buffer is empty, skipping"); + return; + } + + LOGGER.info("IggySinkWriter.flush() - flushing {} messages to stream={}, topic={}", + buffer.size(), streamId, topicId); + + try { + // Serialize all buffered records + List<Message> messages = new ArrayList<>(buffer.size()); + for (T element : buffer) { + byte[] payload = serializer.serialize(element); + // Use createMessage to avoid String conversion + messages.add(createMessage(payload)); + } + + // Determine partitioning + Partitioning partitioning = determinePartitioning(); + + // Send messages to Iggy (HTTP with async wrapper) + StreamId stream = parseStreamId(streamId); + TopicId topic = parseTopicId(topicId); + + LOGGER.info("IggySinkWriter: Sending {} messages with partitioning={}", + messages.size(), partitioning); + CompletableFuture.runAsync(() -> { Review Comment: does it make sense to run this with CompletableFuture? The calling thread is blocked anyway with `join`, so it only brings the overhead of creating a new thread. ########## foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/source/IggySource.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.flink.source; + +import org.apache.flink.api.connector.source.*; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.consumergroup.Consumer; +import org.apache.iggy.connector.config.IggyConnectionConfig; +import org.apache.iggy.connector.config.OffsetConfig; + +import java.io.Serializable; + +/** + * Flink Source implementation for reading from Iggy streams. + * Implements the Flink Source API for integration with DataStream API. + * + * <p>Example usage: + * <pre>{@code + * DataStream<Event> events = env.fromSource( + * IggySource.<Event>builder() + * .setConnectionConfig(connectionConfig) + * .setStreamId("my-stream") + * .setTopicId("my-topic") + * .setConsumerGroup("flink-job-1") + * .setDeserializer(new JsonDeserializationSchema<>(Event.class)) + * .build(), + * WatermarkStrategy.noWatermarks(), + * "Iggy Source" + * ); + * }</pre> + * + * @param <T> the type of records produced by this source + */ +public class IggySource<T> implements Source<T, IggySourceSplit, IggySourceEnumeratorState>, Serializable { + + private static final long serialVersionUID = 1L; + + private final IggyConnectionConfig connectionConfig; + private final String streamId; + private final String topicId; + private final String consumerGroupName; // Serializable representation + private final org.apache.iggy.connector.serialization.DeserializationSchema<T> deserializer; + private final OffsetConfig offsetConfig; + private final long pollBatchSize; + + /** + * Creates a new Iggy source. + * Use {@link #builder()} to construct instances. + * + * @param connectionConfig the connection configuration + * @param streamId the stream identifier + * @param topicId the topic identifier + * @param consumerGroupName the consumer group name + * @param deserializer the deserialization schema + * @param offsetConfig the offset configuration + * @param pollBatchSize the number of messages to fetch per poll + */ + public IggySource( + IggyConnectionConfig connectionConfig, + String streamId, + String topicId, + String consumerGroupName, + org.apache.iggy.connector.serialization.DeserializationSchema<T> deserializer, + OffsetConfig offsetConfig, + long pollBatchSize) { + + this.connectionConfig = connectionConfig; + this.streamId = streamId; + this.topicId = topicId; + this.consumerGroupName = consumerGroupName; + this.deserializer = deserializer; + this.offsetConfig = offsetConfig; + this.pollBatchSize = pollBatchSize; + } + + /** + * Creates a new builder for configuring the source. + * + * @param <T> the type of records produced by the source + * @return a new builder instance + */ + public static <T> IggySourceBuilder<T> builder() { + return new IggySourceBuilder<>(); + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader<T, IggySourceSplit> createReader(SourceReaderContext readerContext) throws Exception { + AsyncIggyTcpClient asyncClient = createAsyncIggyClient(); + Consumer consumer = createConsumer(); + return new IggySourceReader<>( + readerContext, + asyncClient, + deserializer, + consumer, + pollBatchSize); + } + + @Override + public SplitEnumerator<IggySourceSplit, IggySourceEnumeratorState> createEnumerator( + SplitEnumeratorContext<IggySourceSplit> enumContext) throws Exception { + + AsyncIggyTcpClient asyncClient = createAsyncIggyClient(); + return new IggySourceSplitEnumerator( + enumContext, + asyncClient, + streamId, + topicId, + consumerGroupName, + offsetConfig, + null); + } + + @Override + public SplitEnumerator<IggySourceSplit, IggySourceEnumeratorState> restoreEnumerator( + SplitEnumeratorContext<IggySourceSplit> enumContext, + IggySourceEnumeratorState checkpoint) throws Exception { + + AsyncIggyTcpClient asyncClient = createAsyncIggyClient(); + return new IggySourceSplitEnumerator( + enumContext, + asyncClient, + streamId, + topicId, + consumerGroupName, + offsetConfig, + checkpoint); + } + + @Override + public SimpleVersionedSerializer<IggySourceSplit> getSplitSerializer() { + return new IggySourceSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer<IggySourceEnumeratorState> getEnumeratorCheckpointSerializer() { + return new IggySourceEnumeratorStateSerializer(); + } + + /** + * Creates an async Iggy TCP client based on connection configuration. + * + * @return configured async Iggy client + */ + private AsyncIggyTcpClient createAsyncIggyClient() { + try { + // Parse host and port from server address + String serverAddress = connectionConfig.getServerAddress(); + String host; + int port = 8090; // Default TCP port + + if (serverAddress.contains(":")) { + String[] parts = serverAddress.split(":"); + host = parts[0]; + port = 8090; Review Comment: is this intended? why is the port always overwritten to default? ########## foreign/java/external-processors/iggy-connector-flink/docker-compose.yml: ########## @@ -0,0 +1,273 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +services: + # Iggy Message Broker + iggy: + image: apache/iggy:edge + container_name: iggy-server + ports: + - "8080:8080" # QUIC transport + - "8090:8090" # TCP transport + - "3000:3000" # HTTP API / Web UI + environment: + - IGGY_TCP_ADDRESS=0.0.0.0:8090 + - IGGY_HTTP_ADDRESS=0.0.0.0:3000 + - IGGY_QUIC_ADDRESS=0.0.0.0:8080 + - IGGY_SYSTEM_LOGGING_LEVEL=info + - IGGY_ROOT_USERNAME=iggy + - IGGY_ROOT_PASSWORD=iggy + volumes: + - iggy-data:/local_data + networks: + - flink-iggy-network + + # Flink JobManager + jobmanager: + image: flink:2.1.0-scala_2.12-java21 + container_name: flink-jobmanager + ports: + - "8081:8081" # Web UI + - "6123:6123" # RPC + user: root # Start as root to fix permissions + entrypoint: /bin/bash + command: + - -c + - | + # Create checkpoint/savepoint directories with correct permissions + mkdir -p /tmp/flink-checkpoints /tmp/flink-savepoints + chown -R flink:flink /tmp/flink-checkpoints /tmp/flink-savepoints + chmod -R 755 /tmp/flink-checkpoints /tmp/flink-savepoints + # Switch to flink user and run jobmanager + exec gosu flink /docker-entrypoint.sh jobmanager + environment: + - IGGY_SERVER=iggy:8090 + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + jobmanager.memory.process.size: 1600m + state.backend.type: hashmap + state.checkpoints.dir: file:///tmp/flink-checkpoints + state.savepoints.dir: file:///tmp/flink-savepoints + execution.checkpointing.interval: 60000 + execution.checkpointing.mode: EXACTLY_ONCE + execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION + parallelism.default: 2 + taskmanager.numberOfTaskSlots: 4 + volumes: + - flink-checkpoints:/tmp/flink-checkpoints + - flink-savepoints:/tmp/flink-savepoints + # Mount your fat JARs here + - ./iggy-flink-examples/build/libs:/opt/flink/usrlib + # OR if you need shared connector libraries: + # - ./lib/iggy-connector.jar:/opt/flink/lib/iggy-connector.jar + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8081"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - flink-iggy-network + depends_on: + - iggy + + # Flink TaskManager + taskmanager: + image: flink:2.1.0-scala_2.12-java21 + container_name: flink-taskmanager + user: root # Start as root to fix permissions + entrypoint: /bin/bash + command: + - -c + - | + # Create checkpoint/savepoint directories with correct permissions + mkdir -p /tmp/flink-checkpoints /tmp/flink-savepoints + chown -R flink:flink /tmp/flink-checkpoints /tmp/flink-savepoints + chmod -R 755 /tmp/flink-checkpoints /tmp/flink-savepoints + # Switch to flink user and run taskmanager + exec gosu flink /docker-entrypoint.sh taskmanager + environment: + - IGGY_SERVER=iggy:8090 + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + taskmanager.memory.process.size: 1728m + taskmanager.numberOfTaskSlots: 4 + volumes: + - flink-checkpoints:/tmp/flink-checkpoints + - flink-savepoints:/tmp/flink-savepoints + # Same library mounts as jobmanager if needed + # - ./lib/iggy-connector.jar:/opt/flink/lib/iggy-connector.jar + networks: + - flink-iggy-network + depends_on: + - jobmanager + + # Iggy Setup - Creates streams and topics + iggy-setup: + image: alpine:latest + container_name: iggy-setup + depends_on: + - iggy + networks: + - flink-iggy-network + entrypoint: /bin/sh + command: + - -c + - | + # Install dependencies + echo "Installing dependencies..." + apk add --no-cache curl jq + + echo "Waiting for Iggy to be ready..." + # Wait with health check instead of blind sleep + MAX_RETRIES=30 + RETRY=0 + while [ $$RETRY -lt $$MAX_RETRIES ]; do + if curl -f -s http://iggy:3000/stats > /dev/null 2>&1; then + echo "Iggy is ready!" + break + fi + RETRY=$$((RETRY + 1)) + echo "Waiting for Iggy... ($$RETRY/$$MAX_RETRIES)" + sleep 2 + done + + if [ $$RETRY -eq $$MAX_RETRIES ]; then + echo "Iggy failed to start within timeout" + exit 1 + fi + + echo "Authenticating with Iggy..." + RESPONSE=$$(curl -s -X POST http://iggy:3000/users/login \ + -H "Content-Type: application/json" \ + -d '{"username":"iggy","password":"iggy"}') + + TOKEN=$$(echo "$$RESPONSE" | jq -r '.access_token.token') + + if [ -z "$$TOKEN" ] || [ "$$TOKEN" = "null" ]; then + echo "Failed to authenticate" + echo "Response was: $$RESPONSE" + exit 1 + fi + + echo "Authentication successful!" + echo "Creating streams and topics for examples..." + + # StreamTransformJob streams + echo "Creating 'sensors' stream (ID: 1)..." + curl -s -X POST http://iggy:3000/streams \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer $$TOKEN" \ + -d '{"stream_id": 1, "name": "sensors"}' || true Review Comment: Please remove numeric ids from the creation requests. Once https://github.com/apache/iggy/pull/2299 is merged, it won't be possible to create streams/topics/consumer groups with numeric id ########## foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/sink/IggySinkWriter.java: ########## @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.flink.sink; + +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.iggy.client.blocking.http.IggyHttpClient; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.message.Message; +import org.apache.iggy.message.MessageHeader; +import org.apache.iggy.message.MessageId; +import org.apache.iggy.message.Partitioning; +import org.apache.iggy.connector.error.ConnectorException; +import org.apache.iggy.connector.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigInteger; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +/** + * Sink writer implementation for writing records to Iggy. + * Buffers records and flushes them in batches for efficiency. + */ +public class IggySinkWriter<T> implements SinkWriter<T> { + + private static final Logger LOGGER = LoggerFactory.getLogger(IggySinkWriter.class); + + private final IggyHttpClient httpClient; + private final String streamId; + private final String topicId; + private final SerializationSchema<T> serializer; + private final int batchSize; + private final Duration flushInterval; + private final PartitioningStrategy partitioningStrategy; + + private final List<T> buffer; + private long lastFlushTime; + private long totalWritten; + + /** + * Strategy for determining partition assignment. + */ + public enum PartitioningStrategy { + BALANCED, + PARTITION_ID, + MESSAGE_KEY + } + + /** + * Creates a new sink writer. + * + * @param httpClient the HTTP Iggy client + * @param streamId the stream identifier + * @param topicId the topic identifier + * @param serializer the serialization schema + * @param batchSize maximum number of records to buffer before flushing + * @param flushInterval maximum time to wait before flushing + * @param partitioningStrategy the partitioning strategy + */ + public IggySinkWriter( + IggyHttpClient httpClient, + String streamId, + String topicId, + SerializationSchema<T> serializer, + int batchSize, + Duration flushInterval, + PartitioningStrategy partitioningStrategy) { + + if (httpClient == null) { + throw new IllegalArgumentException("httpClient cannot be null"); + } + if (streamId == null || streamId.isEmpty()) { + throw new IllegalArgumentException("streamId cannot be null or empty"); + } + if (topicId == null || topicId.isEmpty()) { + throw new IllegalArgumentException("topicId cannot be null or empty"); + } + if (serializer == null) { + throw new IllegalArgumentException("serializer cannot be null"); + } + if (batchSize <= 0) { + throw new IllegalArgumentException("batchSize must be > 0"); + } + if (flushInterval == null || flushInterval.isNegative()) { + throw new IllegalArgumentException("flushInterval must be positive"); + } + + this.httpClient = httpClient; + this.streamId = streamId; + this.topicId = topicId; + this.serializer = serializer; + this.batchSize = batchSize; + this.flushInterval = flushInterval; + this.partitioningStrategy = partitioningStrategy != null + ? partitioningStrategy + : PartitioningStrategy.BALANCED; + + this.buffer = new ArrayList<>(batchSize); + this.lastFlushTime = System.currentTimeMillis(); + this.totalWritten = 0; + } + + @Override + public void write(T element, Context context) throws IOException { + LOGGER.info("IggySinkWriter.write() called - element: {}, buffer size: {}", Review Comment: The log will be quite noisy if we log every sent item. I'd rather change this to a debug level, same with the flush. ########## foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/source/IggyPartitionSplitReader.java: ########## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.flink.source; + +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.consumergroup.Consumer; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.message.Message; +import org.apache.iggy.message.PolledMessages; +import org.apache.iggy.message.PollingStrategy; +import org.apache.iggy.connector.error.ConnectorException; +import org.apache.iggy.connector.serialization.DeserializationSchema; +import org.apache.iggy.connector.serialization.RecordMetadata; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reads messages from a single Iggy partition. + * Manages offset tracking and message deserialization. + */ +public class IggyPartitionSplitReader<T> { + private static final Logger LOGGER = LoggerFactory.getLogger(IggyPartitionSplitReader.class); + + private final AsyncIggyTcpClient asyncClient; + private final IggySourceSplit split; + private final DeserializationSchema<T> deserializer; + private final Consumer consumer; + private final long pollBatchSize; + + private long currentOffset; + private boolean hasMoreData; + private boolean consumerGroupJoined; + + /** + * Creates a new partition split reader. + * + * @param asyncClient the async Iggy client + * @param split the source split to read from + * @param deserializer the deserialization schema + * @param consumer the consumer identifier + * @param pollBatchSize number of messages to fetch per poll + */ + public IggyPartitionSplitReader( + AsyncIggyTcpClient asyncClient, + IggySourceSplit split, + DeserializationSchema<T> deserializer, + Consumer consumer, + long pollBatchSize) { + + if (asyncClient == null) { + throw new IllegalArgumentException("asyncClient cannot be null"); + } + if (split == null) { + throw new IllegalArgumentException("split cannot be null"); + } + if (deserializer == null) { + throw new IllegalArgumentException("deserializer cannot be null"); + } + if (consumer == null) { + throw new IllegalArgumentException("consumer cannot be null"); + } + if (pollBatchSize <= 0) { + throw new IllegalArgumentException("pollBatchSize must be > 0"); + } + + this.asyncClient = asyncClient; + this.split = split; + this.deserializer = deserializer; + this.consumer = consumer; + this.pollBatchSize = pollBatchSize; + this.currentOffset = split.getCurrentOffset(); + this.hasMoreData = true; + this.consumerGroupJoined = false; + } + + /** + * Polls messages from the partition. + * + * @return list of deserialized records, empty if no messages available + * @throws ConnectorException if polling or deserialization fails + */ + public List<T> poll() { + // For unbounded streams, always poll (don't check hasMoreData) + + try { + StreamId streamId = parseStreamId(split.getStreamId()); + TopicId topicId = parseTopicId(split.getTopicId()); + + // Join consumer group on first poll (idempotent operation in Iggy) + if (!consumerGroupJoined) { + LOGGER.info("IggyPartitionSplitReader: Joining consumer group for stream={}, " + + "topic={}, consumer={}, partition={}", + split.getStreamId(), split.getTopicId(), consumer.id(), + split.getPartitionId()); + asyncClient.consumerGroups() + .joinConsumerGroup(streamId, topicId, consumer.id()) + .join(); + consumerGroupJoined = true; + LOGGER.info("IggyPartitionSplitReader: Successfully joined consumer group"); + } + Optional<Long> partitionId = Optional.of((long) split.getPartitionId()); + + // CRITICAL FIX: Use consumer group managed offset (next available message) + // When using consumer groups, Iggy manages offsets automatically. + // Using PollingStrategy.offset() conflicts with consumer group offset management. + PollingStrategy strategy = PollingStrategy.next(); + + LOGGER.info("IggyPartitionSplitReader: Polling partition={}, " + + "strategy=NEXT (consumer-group-managed), batchSize={}", + split.getPartitionId(), pollBatchSize); + + // CRITICAL FIX: Enable autoCommit to advance consumer group offset after each poll + // Without this, offset never advances and we read the same messages repeatedly + // Poll messages from Iggy (async with blocking) + PolledMessages polledMessages = asyncClient.messages() + .pollMessagesAsync(streamId, topicId, partitionId, consumer, strategy, + pollBatchSize, true) + .join(); + + LOGGER.info("IggyPartitionSplitReader: Polled partition={}, messagesCount={}, " + + "currentOffset={}", + split.getPartitionId(), polledMessages.messages().size(), + polledMessages.currentOffset() != null + ? polledMessages.currentOffset().longValue() : "null"); + + if (polledMessages.messages().isEmpty()) { + // For unbounded streams, keep polling even if no messages available + // Don't set hasMoreData = false + return List.of(); + } + + // Deserialize messages + List<T> records = new ArrayList<>(polledMessages.messages().size()); + for (Message message : polledMessages.messages()) { + T record = deserializeMessage(message); + if (record != null) { + LOGGER.info("IggyPartitionSplitReader: Deserialized message at " + + "offset={}, record={}", + message.header().offset(), record); + records.add(record); + } + } + + // Update current offset + if (polledMessages.currentOffset() != null) { + currentOffset = polledMessages.currentOffset().longValue() + 1; + LOGGER.info("IggyPartitionSplitReader: Updated currentOffset to {}", + currentOffset); + } + + return records; + + } catch (Exception e) { + throw new ConnectorException( + "Failed to poll messages from partition " + split.getPartitionId(), + e, + ConnectorException.ErrorCode.RESOURCE_EXHAUSTED, + true); + } + } + + /** + * Deserializes a single message. + * + * @param message the Iggy message + * @return deserialized record, or null if deserialization fails (logs warning) + */ + private T deserializeMessage(Message message) { + try { + RecordMetadata metadata = RecordMetadata.of( + split.getStreamId(), + split.getTopicId(), + split.getPartitionId(), + message.header().offset().longValue()); + + return deserializer.deserialize(message.payload(), metadata); + + } catch (IOException e) { + // Log warning and skip this message (can be made configurable) + System.err.println("Failed to deserialize message at offset " Review Comment: please use logger instead of writing to stderr ########## foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/sink/IggySink.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.flink.sink; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.iggy.client.blocking.http.IggyHttpClient; +import org.apache.iggy.connector.config.IggyConnectionConfig; +import org.apache.iggy.connector.serialization.SerializationSchema; + +import java.io.IOException; +import java.io.Serializable; +import java.time.Duration; + +/** + * Flink Sink implementation for writing to Iggy streams. + * Implements the Flink Sink V2 API for integration with DataStream API. + * + * <p>Example usage: + * <pre>{@code + * events.sinkTo( + * IggySink.<Event>builder() + * .setConnectionConfig(connectionConfig) + * .setStreamId("my-stream") + * .setTopicId("my-topic") + * .setSerializer(new JsonSerializationSchema<>()) + * .setBatchSize(100) + * .setFlushInterval(Duration.ofSeconds(5)) + * .withBalancedPartitioning() + * .build() + * ).name("Iggy Sink"); + * }</pre> + * + * @param <T> the type of records to write + */ +public class IggySink<T> implements Sink<T>, Serializable { + + private static final long serialVersionUID = 1L; + + private final IggyConnectionConfig connectionConfig; + private final String streamId; + private final String topicId; + private final SerializationSchema<T> serializer; + private final int batchSize; + private final Duration flushInterval; + private final IggySinkWriter.PartitioningStrategy partitioningStrategy; + + /** + * Creates a new Iggy sink. + * Use {@link #builder()} to construct instances. + * + * @param connectionConfig the connection configuration + * @param streamId the stream identifier + * @param topicId the topic identifier + * @param serializer the serialization schema + * @param batchSize the batch size for buffering + * @param flushInterval the maximum flush interval + * @param partitioningStrategy the partitioning strategy + */ + public IggySink( + IggyConnectionConfig connectionConfig, + String streamId, + String topicId, + SerializationSchema<T> serializer, + int batchSize, + Duration flushInterval, + IggySinkWriter.PartitioningStrategy partitioningStrategy) { + + this.connectionConfig = connectionConfig; + this.streamId = streamId; + this.topicId = topicId; + this.serializer = serializer; + this.batchSize = batchSize; + this.flushInterval = flushInterval; + this.partitioningStrategy = partitioningStrategy; + } + + /** + * Creates a new builder for configuring the sink. + * + * @param <T> the type of records to write + * @return a new builder instance + */ + public static <T> IggySinkBuilder<T> builder() { + return new IggySinkBuilder<>(); + } + + @Override + public SinkWriter<T> createWriter(WriterInitContext context) throws IOException { + IggyHttpClient httpClient = createHttpClient(); + return new IggySinkWriter<>( + httpClient, + streamId, + topicId, + serializer, + batchSize, + flushInterval, + partitioningStrategy); + } + + /** + * Creates an HTTP Iggy client based on connection configuration. + * + * @return configured HTTP Iggy client + */ + private IggyHttpClient createHttpClient() { + try { + // Build HTTP URL from server address + String serverAddress = connectionConfig.getServerAddress(); + String httpUrl; + + // If serverAddress already has http:// or https://, use it as is Review Comment: I think the code below could be simplified a lot with java's `URL` class ########## foreign/java/external-processors/iggy-connector-flink/iggy-connector-library/src/main/java/org/apache/iggy/connector/flink/source/IggySourceReader.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.connector.flink.source; + +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.core.io.InputStatus; +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.consumergroup.Consumer; +import org.apache.iggy.connector.serialization.DeserializationSchema; + +import java.util.*; +import java.util.concurrent.CompletableFuture; + +/** + * Flink SourceReader implementation for Iggy. + * Manages multiple partition split readers and coordinates reading from them. + */ +public class IggySourceReader<T> implements SourceReader<T, IggySourceSplit> { + + private final SourceReaderContext context; + private final AsyncIggyTcpClient asyncClient; + private final DeserializationSchema<T> deserializer; + private final Consumer consumer; + private final long pollBatchSize; + + private final Map<String, IggyPartitionSplitReader<T>> splitReaders; + private final Queue<T> recordQueue; + + private boolean noMoreSplits; + + /** + * Creates a new Iggy source reader. + * + * @param context the source reader context + * @param asyncClient the async Iggy client + * @param deserializer the deserialization schema + * @param consumer the consumer identifier + * @param pollBatchSize number of messages to fetch per poll + */ + public IggySourceReader( + SourceReaderContext context, + AsyncIggyTcpClient asyncClient, + DeserializationSchema<T> deserializer, + Consumer consumer, + long pollBatchSize) { + + this.context = context; + this.asyncClient = asyncClient; + this.deserializer = deserializer; + this.consumer = consumer; + this.pollBatchSize = pollBatchSize; + this.splitReaders = new HashMap<>(); + this.recordQueue = new ArrayDeque<>(); + this.noMoreSplits = false; + } + + @Override + public void start() { + // Initialization if needed + } + + @Override + public InputStatus pollNext(ReaderOutput<T> output) throws Exception { + // If we have records in the queue, emit them + if (!recordQueue.isEmpty()) { + T record = recordQueue.poll(); + output.collect(record); + return InputStatus.MORE_AVAILABLE; + } + + // Poll from all split readers + boolean hasData = false; + for (IggyPartitionSplitReader<T> splitReader : splitReaders.values()) { + List<T> records = splitReader.poll(); + if (!records.isEmpty()) { + recordQueue.addAll(records); + hasData = true; + } + } + + // If we got data, emit the first record + if (hasData) { + T record = recordQueue.poll(); + output.collect(record); + return InputStatus.MORE_AVAILABLE; + } + + // No data available + if (noMoreSplits && splitReaders.isEmpty()) { + return InputStatus.END_OF_INPUT; + } + + return InputStatus.NOTHING_AVAILABLE; + } + + @Override + public List<IggySourceSplit> snapshotState(long checkpointId) { + // Return current state of all splits for checkpointing + List<IggySourceSplit> splits = new ArrayList<>(); + for (IggyPartitionSplitReader<T> splitReader : splitReaders.values()) { + splits.add(splitReader.getSplit()); + } + return splits; + } + + @Override + public CompletableFuture<Void> isAvailable() { + // If we have data in queue, immediately available + if (!recordQueue.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + // Check if any split reader has more data + for (IggyPartitionSplitReader<T> splitReader : splitReaders.values()) { + if (splitReader.hasMoreData()) { + return CompletableFuture.completedFuture(null); + } + } + + // No data available right now, return a future that will complete later + // For simplicity, we return completed future and rely on polling + return CompletableFuture.completedFuture(null); + } + + @Override + public void addSplits(List<IggySourceSplit> splits) { + for (IggySourceSplit split : splits) { + String splitId = split.splitId(); + if (!splitReaders.containsKey(splitId)) { + IggyPartitionSplitReader<T> splitReader = new IggyPartitionSplitReader<>( + asyncClient, + split, + deserializer, + consumer, + pollBatchSize); + splitReaders.put(splitId, splitReader); + } + } + } + + @Override + public void notifyNoMoreSplits() { + noMoreSplits = true; + } + + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + // Handle custom source events if needed + } + + @Override + public void close() throws Exception { + // Clean up resources + splitReaders.clear(); + recordQueue.clear(); + + // CRITICAL FIX: Close async client to shutdown EventLoopGroup and prevent memory leak + if (asyncClient != null) { + try { + asyncClient.close().join(); + } catch (Exception e) { + // Log but don't throw - we're in cleanup phase + System.err.println("Error closing async Iggy client: " + e.getMessage()); Review Comment: please change to logger and catching RuntimeException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
