This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new efd8809f8 feat(connector): add iggy-pinot external connector (#2499)
efd8809f8 is described below

commit efd8809f8ee6c9c0576534a245a2698847373aa3
Author: Chiradip Mandal <[email protected]>
AuthorDate: Wed Jan 21 00:39:59 2026 -0800

    feat(connector): add iggy-pinot external connector (#2499)
    
    Co-authored-by: Maciej Modzelewski <[email protected]>
---
 .../iggy-connector-pinot/build.gradle.kts          |  71 ++++++
 .../iggy-connector-pinot/deployment/schema.json    |  31 +++
 .../iggy-connector-pinot/deployment/table.json     |  42 ++++
 .../iggy-connector-pinot/docker-compose.yml        | 135 +++++++++++
 .../examples/sample-messages.json                  |  47 ++++
 .../iggy-connector-pinot/examples/schema.json      |  59 +++++
 .../examples/table-config.json                     |  43 ++++
 .../iggy-connector-pinot/integration-test.sh       | 234 ++++++++++++++++++
 .../connector/pinot/config/IggyStreamConfig.java   | 206 ++++++++++++++++
 .../pinot/consumer/IggyConsumerFactory.java        | 116 +++++++++
 .../connector/pinot/consumer/IggyMessageBatch.java | 130 ++++++++++
 .../pinot/consumer/IggyPartitionGroupConsumer.java | 265 +++++++++++++++++++++
 .../pinot/consumer/IggyPartitionLevelConsumer.java |  49 ++++
 .../consumer/IggyStreamPartitionMsgOffset.java     |  80 +++++++
 .../pinot/decoder/IggyJsonMessageDecoder.java      |  97 ++++++++
 .../pinot/metadata/IggyStreamMetadataProvider.java | 243 +++++++++++++++++++
 ...g.apache.pinot.spi.stream.StreamConsumerFactory |  16 ++
 .../src/main/resources/pinot-plugin.properties     |  31 +++
 .../pinot/config/IggyStreamConfigTest.java         | 193 +++++++++++++++
 .../pinot/consumer/IggyMessageBatchTest.java       | 123 ++++++++++
 .../consumer/IggyStreamPartitionMsgOffsetTest.java |  90 +++++++
 foreign/java/gradle/libs.versions.toml             |   8 +
 foreign/java/settings.gradle.kts                   |   3 +
 23 files changed, 2312 insertions(+)

diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/build.gradle.kts 
b/foreign/java/external-processors/iggy-connector-pinot/build.gradle.kts
new file mode 100644
index 000000000..89e986978
--- /dev/null
+++ b/foreign/java/external-processors/iggy-connector-pinot/build.gradle.kts
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+    id("iggy.java-library-conventions")
+}
+
+dependencies {
+    // Iggy SDK - use local project when building within Iggy repository
+    api(project(":iggy"))
+
+    // Apache Pinot dependencies (provided - not bundled with connector)
+    compileOnly(libs.pinot.spi)
+
+    // Serialization support - use Jackson 2.x for Pinot compatibility
+    implementation(libs.jackson2.databind) {
+        exclude(group = "tools.jackson.core")
+    }
+
+    // Apache Commons
+    implementation(libs.commons.lang3)
+
+    // Logging
+    compileOnly(libs.slf4j.api)
+
+    // Testing
+    testImplementation(platform(libs.junit.bom))
+    testImplementation(libs.bundles.testing)
+    testImplementation(libs.pinot.spi) // Need Pinot SPI for tests
+    testRuntimeOnly(libs.slf4j.simple)
+}
+
+// Task to copy runtime dependencies for Docker deployment (flattened into 
libs directory)
+tasks.register<Copy>("copyDependencies") {
+    from(configurations.runtimeClasspath)
+    into(layout.buildDirectory.dir("libs"))
+}
+
+// Make jar task depend on copyDependencies
+tasks.named("jar") {
+    finalizedBy("copyDependencies")
+}
+
+publishing {
+    publications {
+        named<MavenPublication>("maven") {
+            artifactId = "pinot-connector"
+
+            pom {
+                name = "Apache Iggy - Pinot Connector"
+                description = "Apache Iggy connector plugin for Apache Pinot 
stream ingestion"
+            }
+        }
+    }
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/deployment/schema.json 
b/foreign/java/external-processors/iggy-connector-pinot/deployment/schema.json
new file mode 100644
index 000000000..d7ff8496c
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/deployment/schema.json
@@ -0,0 +1,31 @@
+{
+  "schemaName": "test_events",
+  "dimensionFieldSpecs": [
+    {
+      "name": "userId",
+      "dataType": "STRING"
+    },
+    {
+      "name": "eventType",
+      "dataType": "STRING"
+    },
+    {
+      "name": "deviceType",
+      "dataType": "STRING"
+    }
+  ],
+  "metricFieldSpecs": [
+    {
+      "name": "duration",
+      "dataType": "LONG"
+    }
+  ],
+  "dateTimeFieldSpecs": [
+    {
+      "name": "timestamp",
+      "dataType": "LONG",
+      "format": "1:MILLISECONDS:EPOCH",
+      "granularity": "1:MILLISECONDS"
+    }
+  ]
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/deployment/table.json 
b/foreign/java/external-processors/iggy-connector-pinot/deployment/table.json
new file mode 100644
index 000000000..f6af85365
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/deployment/table.json
@@ -0,0 +1,42 @@
+{
+  "tableName": "test_events",
+  "tableType": "REALTIME",
+  "segmentsConfig": {
+    "timeColumnName": "timestamp",
+    "timeType": "MILLISECONDS",
+    "replication": "1",
+    "schemaName": "test_events"
+  },
+  "tenants": {
+    "broker": "DefaultTenant",
+    "server": "DefaultTenant"
+  },
+  "tableIndexConfig": {
+    "loadMode": "MMAP",
+    "streamConfigs": {
+      "streamType": "iggy",
+      "stream.iggy.topic.name": "test-events",
+      "stream.iggy.consumer.type": "lowlevel",
+      "stream.iggy.consumer.factory.class.name": 
"org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory",
+      "realtime.segment.consumer.factory.class.name": 
"org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory",
+      "stream.iggy.decoder.class.name": 
"org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder",
+
+      "stream.iggy.host": "iggy",
+      "stream.iggy.port": "8090",
+      "stream.iggy.username": "iggy",
+      "stream.iggy.password": "iggy",
+
+      "stream.iggy.stream.id": "test-stream",
+      "stream.iggy.topic.id": "test-events",
+      "stream.iggy.consumer.group": "pinot-integration-test",
+
+      "stream.iggy.poll.batch.size": "100",
+      "stream.iggy.connection.pool.size": "4",
+      "stream.iggy.consumer.prop.auto.offset.reset": "smallest",
+
+      "realtime.segment.flush.threshold.rows": "1000",
+      "realtime.segment.flush.threshold.time": "600000"
+    }
+  },
+  "metadata": {}
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/docker-compose.yml 
b/foreign/java/external-processors/iggy-connector-pinot/docker-compose.yml
new file mode 100644
index 000000000..889338fd2
--- /dev/null
+++ b/foreign/java/external-processors/iggy-connector-pinot/docker-compose.yml
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+services:
+  # Apache Iggy Server (from official Apache repo)
+  iggy:
+    image: apache/iggy:latest
+    container_name: iggy-server
+    ports:
+      - "8090:8090"  # TCP
+      - "3000:3000"  # HTTP
+      - "8080:8080"  # QUIC
+    environment:
+      - IGGY_SYSTEM_LOGGING_LEVEL=info
+      - IGGY_TCP_ADDRESS=0.0.0.0:8090
+      - IGGY_HTTP_ENABLED=true
+      - IGGY_HTTP_ADDRESS=0.0.0.0:3000
+      - IGGY_QUIC_ADDRESS=0.0.0.0:8080
+      - IGGY_ROOT_USERNAME=iggy
+      - IGGY_ROOT_PASSWORD=iggy
+    cap_add:
+      - SYS_NICE
+    security_opt:
+      - seccomp:unconfined
+    ulimits:
+      memlock:
+        soft: -1
+        hard: -1
+    healthcheck:
+      test: ["CMD", "curl", "-f", "http://localhost:3000/";]
+      interval: 10s
+      timeout: 5s
+      retries: 5
+      start_period: 10s
+    networks:
+      - iggy-pinot-network
+
+  # Zookeeper (required by Pinot)
+  zookeeper:
+    image: zookeeper:3.9
+    container_name: pinot-zookeeper
+    ports:
+      - "2181:2181"
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+      ZOOKEEPER_TICK_TIME: 2000
+    networks:
+      - iggy-pinot-network
+
+  # Apache Pinot Controller
+  pinot-controller:
+    image: apachepinot/pinot:latest
+    container_name: pinot-controller
+    command: "StartController -zkAddress zookeeper:2181"
+    ports:
+      - "9000:9000"
+    environment:
+      JAVA_OPTS: "-Xms1G -Xmx2G -XX:+UseG1GC"
+    depends_on:
+      - zookeeper
+    healthcheck:
+      test: ["CMD", "curl", "-f", "http://localhost:9000/health";]
+      interval: 10s
+      timeout: 5s
+      retries: 10
+      start_period: 30s
+    volumes:
+      - ./build/libs:/opt/pinot/plugins/pinot-stream-ingestion/iggy-connector
+      - 
../../java-sdk/build/libs/iggy-0.6.0.jar:/opt/pinot/plugins/pinot-stream-ingestion/iggy-connector/iggy-0.6.0.jar
+      - ./deployment:/opt/pinot/deployment
+    networks:
+      - iggy-pinot-network
+
+  # Apache Pinot Broker
+  pinot-broker:
+    image: apachepinot/pinot:latest
+    container_name: pinot-broker
+    command: "StartBroker -zkAddress zookeeper:2181"
+    ports:
+      - "8099:8099"
+    environment:
+      JAVA_OPTS: "-Xms512M -Xmx1G -XX:+UseG1GC"
+    depends_on:
+      - pinot-controller
+    healthcheck:
+      test: ["CMD", "curl", "-f", "http://localhost:8099/health";]
+      interval: 10s
+      timeout: 5s
+      retries: 10
+      start_period: 30s
+    networks:
+      - iggy-pinot-network
+
+  # Apache Pinot Server
+  pinot-server:
+    image: apachepinot/pinot:latest
+    container_name: pinot-server
+    command: "StartServer -zkAddress zookeeper:2181"
+    ports:
+      - "8098:8098"
+      - "8097:8097"
+    environment:
+      JAVA_OPTS: "-Xms1G -Xmx2G -XX:+UseG1GC -Dplugins.include=iggy-connector"
+    depends_on:
+      - pinot-broker
+    healthcheck:
+      test: ["CMD", "curl", "-f", "http://localhost:8097/health";]
+      interval: 10s
+      timeout: 5s
+      retries: 10
+      start_period: 30s
+    volumes:
+      - ./build/libs:/opt/pinot/plugins/pinot-stream-ingestion/iggy-connector
+      - 
../../java-sdk/build/libs/iggy-0.6.0.jar:/opt/pinot/plugins/pinot-stream-ingestion/iggy-connector/iggy-0.6.0.jar
+      - ./deployment:/opt/pinot/deployment
+    networks:
+      - iggy-pinot-network
+
+networks:
+  iggy-pinot-network:
+    driver: bridge
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/examples/sample-messages.json
 
b/foreign/java/external-processors/iggy-connector-pinot/examples/sample-messages.json
new file mode 100644
index 000000000..df0cbcfaa
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/examples/sample-messages.json
@@ -0,0 +1,47 @@
+{
+  "description": "Sample messages to send to Iggy for testing Pinot ingestion",
+  "messages": [
+    {
+      "userId": "user_12345",
+      "sessionId": "session_abc123",
+      "eventType": "page_view",
+      "pageUrl": "/products/laptop",
+      "deviceType": "desktop",
+      "browser": "Chrome",
+      "country": "USA",
+      "city": "San Francisco",
+      "duration": 45000,
+      "pageLoadTime": 1200,
+      "scrollDepth": 75,
+      "eventTimestamp": 1701234567890
+    },
+    {
+      "userId": "user_67890",
+      "sessionId": "session_def456",
+      "eventType": "click",
+      "pageUrl": "/checkout",
+      "deviceType": "mobile",
+      "browser": "Safari",
+      "country": "UK",
+      "city": "London",
+      "duration": 2000,
+      "pageLoadTime": 800,
+      "scrollDepth": 100,
+      "eventTimestamp": 1701234570000
+    },
+    {
+      "userId": "user_12345",
+      "sessionId": "session_abc123",
+      "eventType": "purchase",
+      "pageUrl": "/confirmation",
+      "deviceType": "desktop",
+      "browser": "Chrome",
+      "country": "USA",
+      "city": "San Francisco",
+      "duration": 10000,
+      "pageLoadTime": 950,
+      "scrollDepth": 50,
+      "eventTimestamp": 1701234580000
+    }
+  ]
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/examples/schema.json 
b/foreign/java/external-processors/iggy-connector-pinot/examples/schema.json
new file mode 100644
index 000000000..77b226091
--- /dev/null
+++ b/foreign/java/external-processors/iggy-connector-pinot/examples/schema.json
@@ -0,0 +1,59 @@
+{
+  "schemaName": "user_events",
+  "dimensionFieldSpecs": [
+    {
+      "name": "userId",
+      "dataType": "STRING"
+    },
+    {
+      "name": "sessionId",
+      "dataType": "STRING"
+    },
+    {
+      "name": "eventType",
+      "dataType": "STRING"
+    },
+    {
+      "name": "pageUrl",
+      "dataType": "STRING"
+    },
+    {
+      "name": "deviceType",
+      "dataType": "STRING"
+    },
+    {
+      "name": "browser",
+      "dataType": "STRING"
+    },
+    {
+      "name": "country",
+      "dataType": "STRING"
+    },
+    {
+      "name": "city",
+      "dataType": "STRING"
+    }
+  ],
+  "metricFieldSpecs": [
+    {
+      "name": "duration",
+      "dataType": "LONG"
+    },
+    {
+      "name": "pageLoadTime",
+      "dataType": "INT"
+    },
+    {
+      "name": "scrollDepth",
+      "dataType": "INT"
+    }
+  ],
+  "dateTimeFieldSpecs": [
+    {
+      "name": "eventTimestamp",
+      "dataType": "LONG",
+      "format": "1:MILLISECONDS:EPOCH",
+      "granularity": "1:MILLISECONDS"
+    }
+  ]
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/examples/table-config.json
 
b/foreign/java/external-processors/iggy-connector-pinot/examples/table-config.json
new file mode 100644
index 000000000..a6b16ad95
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/examples/table-config.json
@@ -0,0 +1,43 @@
+{
+  "tableName": "user_events_realtime",
+  "tableType": "REALTIME",
+  "segmentsConfig": {
+    "timeColumnName": "eventTimestamp",
+    "timeType": "MILLISECONDS",
+    "replication": "1",
+    "schemaName": "user_events"
+  },
+  "tenants": {
+    "broker": "DefaultTenant",
+    "server": "DefaultTenant"
+  },
+  "tableIndexConfig": {
+    "loadMode": "MMAP",
+    "streamConfigs": {
+      "streamType": "iggy",
+      "stream.iggy.consumer.type": "lowlevel",
+      "stream.iggy.consumer.factory.class.name": 
"org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory",
+      "stream.iggy.decoder.class.name": 
"org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder",
+
+      "stream.iggy.host": "localhost",
+      "stream.iggy.port": "8090",
+      "stream.iggy.username": "iggy",
+      "stream.iggy.password": "iggy",
+      "stream.iggy.enable.tls": "false",
+
+      "stream.iggy.stream.id": "analytics",
+      "stream.iggy.topic.id": "user-events",
+      "stream.iggy.consumer.group": "pinot-realtime-consumer",
+
+      "stream.iggy.poll.batch.size": "1000",
+      "stream.iggy.connection.pool.size": "8",
+
+      "realtime.segment.flush.threshold.rows": "50000",
+      "realtime.segment.flush.threshold.time": "3600000",
+      "realtime.segment.flush.threshold.segment.size": "100M"
+    }
+  },
+  "metadata": {
+    "customConfigs": {}
+  }
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/integration-test.sh 
b/foreign/java/external-processors/iggy-connector-pinot/integration-test.sh
new file mode 100755
index 000000000..4e42a37bf
--- /dev/null
+++ b/foreign/java/external-processors/iggy-connector-pinot/integration-test.sh
@@ -0,0 +1,234 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -e
+
+# Colors for output
+GREEN='\033[0;32m'
+YELLOW='\033[1;33m'
+RED='\033[0;31m'
+NC='\033[0m' # No Color
+
+echo -e "${GREEN}=====================================${NC}"
+echo -e "${GREEN}Iggy-Pinot Integration Test${NC}"
+echo -e "${GREEN}=====================================${NC}"
+
+# Navigate to connector directory
+cd "$(dirname "$0")"
+
+# Step 1: Build JARs
+echo -e "\n${YELLOW}Step 1: Building JARs...${NC}"
+cd ../../
+gradle :iggy-connector-pinot:jar :iggy:jar
+cd external-processors/iggy-connector-pinot
+echo -e "${GREEN}✓ JARs built successfully${NC}"
+
+# Step 2: Start Docker environment
+echo -e "\n${YELLOW}Step 2: Starting Docker environment...${NC}"
+docker-compose down -v
+docker-compose up -d
+echo -e "${GREEN}✓ Docker containers starting${NC}"
+
+# Step 3: Wait for services to be healthy
+echo -e "\n${YELLOW}Step 3: Waiting for services to be healthy...${NC}"
+
+echo -n "Waiting for Iggy... "
+for i in {1..30}; do
+    if curl --connect-timeout 3 --max-time 5 -s http://localhost:3000/ > 
/dev/null 2>&1; then
+        echo -e "${GREEN}✓${NC}"
+        break
+    fi
+    sleep 2
+    echo -n "."
+done
+
+echo -n "Waiting for Pinot Controller... "
+for i in {1..60}; do
+    if curl --connect-timeout 3 --max-time 5 -s http://localhost:9000/health > 
/dev/null 2>&1; then
+        echo -e "${GREEN}✓${NC}"
+        break
+    fi
+    sleep 2
+    echo -n "."
+done
+
+echo -n "Waiting for Pinot Broker... "
+for i in {1..60}; do
+    if curl --connect-timeout 3 --max-time 5 -s http://localhost:8099/health > 
/dev/null 2>&1; then
+        echo -e "${GREEN}✓${NC}"
+        break
+    fi
+    sleep 2
+    echo -n "."
+done
+
+echo -n "Waiting for Pinot Server... "
+for i in {1..60}; do
+    if curl --connect-timeout 3 --max-time 5 -s http://localhost:8097/health > 
/dev/null 2>&1; then
+        echo -e "${GREEN}✓${NC}"
+        break
+    fi
+    sleep 2
+    echo -n "."
+done
+
+sleep 5 # Extra time for services to stabilize
+
+# Step 4: Login to Iggy and create stream/topic
+echo -e "\n${YELLOW}Step 4: Logging in to Iggy and creating 
stream/topic...${NC}"
+
+# Login and get JWT token
+TOKEN=$(curl -s -X POST "http://localhost:3000/users/login"; \
+  -H "Content-Type: application/json" \
+  -d '{"username": "iggy", "password": "iggy"}' | jq -r '.access_token.token')
+
+if [ -z "$TOKEN" ] || [ "$TOKEN" = "null" ]; then
+  echo -e "${RED}✗ Failed to get authentication token${NC}"
+  exit 1
+fi
+
+echo -e "${GREEN}✓ Authenticated${NC}"
+
+# Create stream
+curl -s -X POST "http://localhost:3000/streams"; \
+  -H "Authorization: Bearer $TOKEN" \
+  -H "Content-Type: application/json" \
+  -d '{"stream_id": 1, "name": "test-stream"}' \
+  && echo -e "${GREEN}✓ Stream created${NC}" || echo -e "${RED}✗ Stream 
creation failed (may already exist)${NC}"
+
+# Create topic
+TOPIC_RESPONSE=$(curl -s -X POST 
"http://localhost:3000/streams/test-stream/topics"; \
+  -H "Authorization: Bearer $TOKEN" \
+  -H "Content-Type: application/json" \
+  -d '{"topic_id": 1, "name": "test-events", "partitions_count": 2, 
"compression_algorithm": "none", "message_expiry": 0, "max_topic_size": 0}')
+
+if echo "$TOPIC_RESPONSE" | grep -q '"id"'; then
+  echo -e "${GREEN}✓ Topic created${NC}"
+else
+  echo -e "${RED}✗ Topic creation failed: $TOPIC_RESPONSE${NC}"
+  exit 1
+fi
+
+# Create consumer group (topic-scoped, not stream-scoped)
+curl -s -X POST 
"http://localhost:3000/streams/test-stream/topics/test-events/consumer-groups"; \
+  -H "Authorization: Bearer $TOKEN" \
+  -H "Content-Type: application/json" \
+  -d '{"name": "pinot-integration-test"}' \
+  && echo -e "${GREEN}✓ Consumer group created${NC}" || echo -e 
"${YELLOW}Note: Consumer group may already exist${NC}"
+
+# Step 5: Create Pinot schema
+echo -e "\n${YELLOW}Step 5: Creating Pinot schema...${NC}"
+curl -X POST "http://localhost:9000/schemas"; \
+  -H "Content-Type: application/json" \
+  -d @deployment/schema.json \
+  && echo -e "${GREEN}✓ Schema created${NC}" || echo -e "${RED}✗ Schema 
creation failed${NC}"
+
+# Step 6: Create Pinot table
+echo -e "\n${YELLOW}Step 6: Creating Pinot realtime table...${NC}"
+TABLE_RESPONSE=$(curl -s -X POST "http://localhost:9000/tables"; \
+  -H "Content-Type: application/json" \
+  -d @deployment/table.json)
+
+if echo "$TABLE_RESPONSE" | grep -q '"status":"Table test_events_REALTIME 
succesfully added"'; then
+  echo -e "${GREEN}✓ Table created${NC}"
+elif echo "$TABLE_RESPONSE" | grep -q '"code":500'; then
+  echo -e "${RED}✗ Table creation failed${NC}"
+  echo "$TABLE_RESPONSE" | jq '.'
+  exit 1
+else
+  echo -e "${GREEN}✓ Table created${NC}"
+fi
+
+sleep 5 # Let table initialize
+
+# Step 7: Send test messages to Iggy
+echo -e "\n${YELLOW}Step 7: Sending test messages to Iggy...${NC}"
+
+# Partition value for partition 0 (4-byte little-endian, base64 encoded)
+PARTITION_VALUE=$(printf '\x00\x00\x00\x00' | base64)
+
+for i in {1..10}; do
+    TIMESTAMP=$(($(date +%s) * 1000))
+    MESSAGE=$(cat <<EOF
+{
+  "userId": "user$i",
+  "eventType": "test_event",
+  "deviceType": "desktop",
+  "duration": $((i * 100)),
+  "timestamp": $TIMESTAMP
+}
+EOF
+)
+
+    curl -X POST 
"http://localhost:3000/streams/test-stream/topics/test-events/messages"; \
+      -H "Authorization: Bearer $TOKEN" \
+      -H "Content-Type: application/json" \
+      -d "{\"partitioning\": {\"kind\": \"partition_id\", \"value\": 
\"$PARTITION_VALUE\"}, \"messages\": [{\"payload\": \"$(echo "$MESSAGE" | 
base64)\"}]}" \
+      > /dev/null 2>&1
+    echo -e "${GREEN}✓ Message $i sent${NC}"
+    sleep 1
+done
+
+# Step 8: Wait for ingestion
+echo -e "\n${YELLOW}Step 8: Waiting for Pinot to ingest messages...${NC}"
+sleep 15
+
+# Step 9: Query Pinot and verify data
+echo -e "\n${YELLOW}Step 9: Querying Pinot for ingested data...${NC}"
+
+QUERY_RESULT=$(curl -s -X POST "http://localhost:8099/query/sql"; \
+  -H "Content-Type: application/json" \
+  -d '{"sql": "SELECT COUNT(*) FROM test_events_REALTIME"}')
+
+echo "Query Result:"
+echo "$QUERY_RESULT" | jq '.'
+
+# Extract count from result
+COUNT=$(echo "$QUERY_RESULT" | jq -r '.resultTable.rows[0][0]' 2>/dev/null || 
echo "0")
+
+if [ "$COUNT" -gt "0" ]; then
+    echo -e "\n${GREEN}=====================================${NC}"
+    echo -e "${GREEN}✓ Integration Test PASSED!${NC}"
+    echo -e "${GREEN}Successfully ingested $COUNT messages${NC}"
+    echo -e "${GREEN}=====================================${NC}"
+
+    # Show sample data
+    echo -e "\n${YELLOW}Sample data:${NC}"
+    curl -s -X POST "http://localhost:8099/query/sql"; \
+      -H "Content-Type: application/json" \
+      -d '{"sql": "SELECT * FROM test_events_REALTIME LIMIT 5"}' | jq '.'
+
+    EXIT_CODE=0
+else
+    echo -e "\n${RED}=====================================${NC}"
+    echo -e "${RED}✗ Integration Test FAILED!${NC}"
+    echo -e "${RED}No messages ingested${NC}"
+    echo -e "${RED}=====================================${NC}"
+
+    # Show logs for debugging
+    echo -e "\n${YELLOW}Pinot Server logs:${NC}"
+    docker logs pinot-server --tail 50
+
+    EXIT_CODE=1
+fi
+
+# Cleanup option
+echo -e "\n${YELLOW}To stop the environment: docker-compose down -v${NC}"
+echo -e "${YELLOW}To view logs: docker-compose logs -f${NC}"
+
+exit $EXIT_CODE
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/config/IggyStreamConfig.java
 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/config/IggyStreamConfig.java
new file mode 100644
index 000000000..28ef8c55d
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/config/IggyStreamConfig.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.config;
+
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+
+import java.util.Map;
+
+/**
+ * Configuration class for Iggy stream ingestion in Pinot.
+ * Extracts and validates Iggy-specific properties from Pinot's streamConfigs.
+ *
+ * <p>Configuration properties (with prefix "stream.iggy."):
+ * <ul>
+ *   <li>host - Iggy server hostname (required)</li>
+ *   <li>port - Iggy server TCP port (default: 8090)</li>
+ *   <li>username - Authentication username (default: "iggy")</li>
+ *   <li>password - Authentication password (default: "iggy")</li>
+ *   <li>stream.id - Iggy stream identifier (required)</li>
+ *   <li>topic.id - Iggy topic identifier (required)</li>
+ *   <li>consumer.group - Consumer group name (required)</li>
+ *   <li>connection.pool.size - TCP connection pool size (default: 4)</li>
+ *   <li>poll.batch.size - Number of messages to fetch per poll (default: 
100)</li>
+ *   <li>enable.tls - Enable TLS encryption (default: false)</li>
+ * </ul>
+ */
+public class IggyStreamConfig {
+
+    private static final String IGGY_PREFIX = "stream.iggy.";
+
+    // Connection properties
+    private static final String HOST_KEY = IGGY_PREFIX + "host";
+    private static final String PORT_KEY = IGGY_PREFIX + "port";
+    private static final String USERNAME_KEY = IGGY_PREFIX + "username";
+    private static final String PASSWORD_KEY = IGGY_PREFIX + "password";
+    private static final String ENABLE_TLS_KEY = IGGY_PREFIX + "enable.tls";
+    private static final String CONNECTION_POOL_SIZE_KEY = IGGY_PREFIX + 
"connection.pool.size";
+
+    // Stream/Topic properties
+    private static final String STREAM_ID_KEY = IGGY_PREFIX + "stream.id";
+    private static final String TOPIC_ID_KEY = IGGY_PREFIX + "topic.id";
+
+    // Consumer properties
+    private static final String CONSUMER_GROUP_KEY = IGGY_PREFIX + 
"consumer.group";
+    private static final String POLL_BATCH_SIZE_KEY = IGGY_PREFIX + 
"poll.batch.size";
+
+    // Default values
+    private static final int DEFAULT_PORT = 8090;
+    private static final String DEFAULT_USERNAME = "iggy";
+    private static final String DEFAULT_PASSWORD = "iggy";
+    private static final boolean DEFAULT_ENABLE_TLS = false;
+    private static final int DEFAULT_CONNECTION_POOL_SIZE = 4;
+    private static final int DEFAULT_POLL_BATCH_SIZE = 100;
+
+    private final StreamConfig streamConfig;
+    private final Map<String, String> props;
+
+    /**
+     * Creates a new Iggy stream configuration from Pinot's StreamConfig.
+     *
+     * @param streamConfig Pinot stream configuration
+     */
+    public IggyStreamConfig(StreamConfig streamConfig) {
+        this.streamConfig = streamConfig;
+        this.props = streamConfig.getStreamConfigsMap();
+        validate();
+    }
+
+    /**
+     * Validates required configuration properties.
+     *
+     * @throws IllegalArgumentException if required properties are missing
+     */
+    private void validate() {
+        requireProperty(HOST_KEY, "Iggy server host is required");
+        requireProperty(STREAM_ID_KEY, "Iggy stream ID is required");
+        requireProperty(TOPIC_ID_KEY, "Iggy topic ID is required");
+        requireProperty(CONSUMER_GROUP_KEY, "Iggy consumer group is required");
+    }
+
+    private void requireProperty(String key, String errorMessage) {
+        if (!props.containsKey(key)
+                || props.get(key) == null
+                || props.get(key).trim().isEmpty()) {
+            throw new IllegalArgumentException(errorMessage + " (property: " + 
key + ")");
+        }
+    }
+
+    public String getHost() {
+        return props.get(HOST_KEY);
+    }
+
+    public int getPort() {
+        String portStr = props.get(PORT_KEY);
+        return portStr != null ? Integer.parseInt(portStr) : DEFAULT_PORT;
+    }
+
+    public String getUsername() {
+        return props.getOrDefault(USERNAME_KEY, DEFAULT_USERNAME);
+    }
+
+    public String getPassword() {
+        return props.getOrDefault(PASSWORD_KEY, DEFAULT_PASSWORD);
+    }
+
+    public boolean isEnableTls() {
+        String tlsStr = props.get(ENABLE_TLS_KEY);
+        return tlsStr != null ? Boolean.parseBoolean(tlsStr) : 
DEFAULT_ENABLE_TLS;
+    }
+
+    public int getConnectionPoolSize() {
+        String poolSizeStr = props.get(CONNECTION_POOL_SIZE_KEY);
+        return poolSizeStr != null ? Integer.parseInt(poolSizeStr) : 
DEFAULT_CONNECTION_POOL_SIZE;
+    }
+
+    public String getStreamId() {
+        return props.get(STREAM_ID_KEY);
+    }
+
+    public String getTopicId() {
+        return props.get(TOPIC_ID_KEY);
+    }
+
+    public String getConsumerGroup() {
+        return props.get(CONSUMER_GROUP_KEY);
+    }
+
+    public int getPollBatchSize() {
+        String batchSizeStr = props.get(POLL_BATCH_SIZE_KEY);
+        return batchSizeStr != null ? Integer.parseInt(batchSizeStr) : 
DEFAULT_POLL_BATCH_SIZE;
+    }
+
+    /**
+     * Gets the offset specification from Pinot's consumer config.
+     *
+     * @return offset criteria
+     */
+    public OffsetCriteria getOffsetCriteria() {
+        return streamConfig.getOffsetCriteria();
+    }
+
+    /**
+     * Gets the Pinot table name for this stream.
+     *
+     * @return table name with type suffix
+     */
+    public String getTableNameWithType() {
+        return streamConfig.getTableNameWithType();
+    }
+
+    /**
+     * Creates server address in format "host:port".
+     *
+     * @return server address string
+     */
+    public String getServerAddress() {
+        return getHost() + ":" + getPort();
+    }
+
+    @Override
+    public String toString() {
+        return "IggyStreamConfig{"
+                + "host='"
+                + getHost()
+                + '\''
+                + ", port="
+                + getPort()
+                + ", username='"
+                + getUsername()
+                + '\''
+                + ", streamId='"
+                + getStreamId()
+                + '\''
+                + ", topicId='"
+                + getTopicId()
+                + '\''
+                + ", consumerGroup='"
+                + getConsumerGroup()
+                + '\''
+                + ", enableTls="
+                + isEnableTls()
+                + ", connectionPoolSize="
+                + getConnectionPoolSize()
+                + ", pollBatchSize="
+                + getPollBatchSize()
+                + '}';
+    }
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java
 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java
new file mode 100644
index 000000000..8c10f4f60
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.connector.pinot.metadata.IggyStreamMetadataProvider;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+
+/**
+ * Factory for creating Iggy stream consumers and metadata providers.
+ * This is the main entry point for Pinot's stream ingestion framework to 
interact with Iggy.
+ *
+ * <p>Configuration in Pinot table config:
+ * <pre>{@code
+ * "streamConfigs": {
+ *   "streamType": "iggy",
+ *   "stream.iggy.consumer.factory.class.name": 
"org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory",
+ *   "stream.iggy.host": "localhost",
+ *   "stream.iggy.port": "8090",
+ *   "stream.iggy.username": "iggy",
+ *   "stream.iggy.password": "iggy",
+ *   "stream.iggy.stream.id": "my-stream",
+ *   "stream.iggy.topic.id": "my-topic",
+ *   "stream.iggy.consumer.group": "pinot-consumer-group",
+ *   "stream.iggy.poll.batch.size": "100"
+ * }
+ * }</pre>
+ */
+public class IggyConsumerFactory extends StreamConsumerFactory {
+
+    private StreamConfig streamConfig;
+
+    @Override
+    public void init(StreamConfig streamConfig) {
+        this.streamConfig = streamConfig;
+    }
+
+    /**
+     * Creates a partition-level consumer for reading from a specific Iggy 
partition.
+     * Pinot calls this method for each partition that needs to be consumed.
+     *
+     * @param clientId unique identifier for this consumer instance
+     * @param partitionGroupConsumptionStatus consumption status containing 
partition group ID and offset info
+     * @return a new partition consumer instance
+     */
+    @Override
+    public PartitionGroupConsumer createPartitionGroupConsumer(
+            String clientId, PartitionGroupConsumptionStatus 
partitionGroupConsumptionStatus) {
+        IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig);
+        int partitionGroupId = 
partitionGroupConsumptionStatus.getPartitionGroupId();
+        return new IggyPartitionGroupConsumer(clientId, iggyConfig, 
partitionGroupId);
+    }
+
+    /**
+     * Creates a partition-level consumer (newer Pinot API).
+     * Wraps the partition group consumer for compatibility.
+     *
+     * @param clientId unique identifier for this consumer instance
+     * @param partition partition identifier
+     * @return a new partition consumer instance
+     */
+    @Override
+    public PartitionLevelConsumer createPartitionLevelConsumer(String 
clientId, int partition) {
+        IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig);
+        IggyPartitionGroupConsumer groupConsumer = new 
IggyPartitionGroupConsumer(clientId, iggyConfig, partition);
+        return new IggyPartitionLevelConsumer(groupConsumer);
+    }
+
+    /**
+     * Creates a metadata provider for querying stream information.
+     * Used by Pinot to discover partitions and check offset positions.
+     *
+     * @param clientId unique identifier for this metadata provider instance
+     * @param groupId partition group identifier
+     * @return a new metadata provider instance
+     */
+    @Override
+    public StreamMetadataProvider createPartitionMetadataProvider(String 
clientId, int groupId) {
+        IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig);
+        return new IggyStreamMetadataProvider(clientId, iggyConfig, groupId);
+    }
+
+    /**
+     * Creates a metadata provider for the entire stream (all partitions).
+     *
+     * @param clientId unique identifier for this metadata provider instance
+     * @return a new metadata provider instance
+     */
+    @Override
+    public StreamMetadataProvider createStreamMetadataProvider(String 
clientId) {
+        IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig);
+        return new IggyStreamMetadataProvider(clientId, iggyConfig);
+    }
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatch.java
 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatch.java
new file mode 100644
index 000000000..5e59ba6a8
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatch.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.util.List;
+
+/**
+ * Implementation of Pinot's MessageBatch for Iggy messages.
+ * Wraps a list of messages with their offsets for consumption by Pinot.
+ */
+public class IggyMessageBatch implements MessageBatch<byte[]> {
+
+    private final List<IggyMessageAndOffset> messages;
+
+    /**
+     * Creates a new message batch.
+     *
+     * @param messages list of messages with offsets
+     */
+    public IggyMessageBatch(List<IggyMessageAndOffset> messages) {
+        this.messages = messages;
+    }
+
+    @Override
+    public int getMessageCount() {
+        return messages.size();
+    }
+
+    @Override
+    public byte[] getMessageAtIndex(int index) {
+        return messages.get(index).message;
+    }
+
+    @Override
+    public int getMessageOffsetAtIndex(int index) {
+        return index;
+    }
+
+    @Override
+    public int getMessageLengthAtIndex(int index) {
+        return messages.get(index).message.length;
+    }
+
+    @Override
+    public long getNextStreamMessageOffsetAtIndex(int index) {
+        if (index >= 0 && index < messages.size()) {
+            return messages.get(index).offset.getOffset();
+        }
+        return 0;
+    }
+
+    @Override
+    public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int 
index) {
+        if (index >= 0 && index < messages.size()) {
+            return messages.get(index).offset;
+        }
+        return null;
+    }
+
+    @Override
+    public StreamMessage<byte[]> getStreamMessage(int index) {
+        IggyMessageAndOffset messageAndOffset = messages.get(index);
+
+        // Calculate next offset (current + 1)
+        long currentOffset = messageAndOffset.offset.getOffset();
+        IggyStreamPartitionMsgOffset nextOffset = new 
IggyStreamPartitionMsgOffset(currentOffset + 1);
+
+        // Create metadata with offset information
+        StreamMessageMetadata metadata = new StreamMessageMetadata.Builder()
+                .setRecordIngestionTimeMs(System.currentTimeMillis())
+                .setOffset(messageAndOffset.offset, nextOffset)
+                .build();
+
+        // Create and return StreamMessage
+        return new StreamMessage<>(null, messageAndOffset.message, 
messageAndOffset.message.length, metadata);
+    }
+
+    @Override
+    public StreamPartitionMsgOffset getOffsetOfNextBatch() {
+        if (messages.isEmpty()) {
+            return new IggyStreamPartitionMsgOffset(0);
+        }
+        // Return the offset after the last message
+        long lastOffset = messages.get(messages.size() - 1).offset.getOffset();
+        return new IggyStreamPartitionMsgOffset(lastOffset + 1);
+    }
+
+    /**
+     * Container for an Iggy message and its offset.
+     */
+    public static class IggyMessageAndOffset {
+        private final byte[] message;
+        private final IggyStreamPartitionMsgOffset offset;
+
+        public IggyMessageAndOffset(byte[] message, 
IggyStreamPartitionMsgOffset offset) {
+            this.message = message;
+            this.offset = offset;
+        }
+
+        public byte[] getMessage() {
+            return message;
+        }
+
+        public IggyStreamPartitionMsgOffset getOffset() {
+            return offset;
+        }
+    }
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java
 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java
new file mode 100644
index 000000000..37ba72fc0
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.ConsumerId;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.PolledMessages;
+import org.apache.iggy.message.PollingStrategy;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Partition-level consumer implementation for Iggy streams.
+ * Reads messages from a single Iggy partition using the AsyncIggyTcpClient.
+ *
+ * <p>This consumer manages:
+ * <ul>
+ *   <li>TCP connection to Iggy server</li>
+ *   <li>Single consumer mode (not consumer groups)</li>
+ *   <li>Message polling with explicit offset tracking</li>
+ *   <li>Offset management controlled by Pinot</li>
+ * </ul>
+ */
+public class IggyPartitionGroupConsumer implements PartitionGroupConsumer {
+
+    private static final Logger log = 
LoggerFactory.getLogger(IggyPartitionGroupConsumer.class);
+
+    private final IggyStreamConfig config;
+    private final int partitionId;
+
+    private AsyncIggyTcpClient asyncClient;
+    private StreamId streamId;
+    private TopicId topicId;
+    private Consumer consumer;
+    private long currentOffset;
+
+    /**
+     * Creates a new partition consumer.
+     *
+     * @param clientId unique identifier for this consumer
+     * @param config Iggy stream configuration
+     * @param partitionId the partition to consume from
+     */
+    public IggyPartitionGroupConsumer(String clientId, IggyStreamConfig 
config, int partitionId) {
+        this.config = config;
+        this.partitionId = partitionId;
+        this.currentOffset = 0;
+
+        log.info(
+                "Created IggyPartitionGroupConsumer: clientId={}, 
partition={}, config={}",
+                clientId,
+                partitionId,
+                config);
+    }
+
+    /**
+     * Fetches the next batch of messages from the Iggy partition.
+     * This method is called repeatedly by Pinot to poll for new messages.
+     *
+     * @param startOffset the offset to start consuming from (may be null)
+     * @param timeoutMillis timeout for the fetch operation
+     * @return batch of messages, or empty batch if no messages available
+     */
+    @Override
+    public MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, 
int timeoutMillis) {
+        try {
+            ensureConnected();
+
+            // No need to join consumer group when using single consumer
+
+            // Determine starting offset
+            long fetchOffset = determineStartOffset(startOffset);
+            log.debug("Fetching messages from partition {} at offset {}", 
partitionId, fetchOffset);
+
+            // Poll messages from Iggy
+            PolledMessages polledMessages = pollMessages(fetchOffset);
+            log.debug(
+                    "Polled {} messages from partition {}",
+                    polledMessages.messages().size(),
+                    partitionId);
+
+            // Convert to Pinot MessageBatch
+            MessageBatch batch = convertToMessageBatch(polledMessages);
+            return batch;
+
+        } catch (RuntimeException e) {
+            log.error("Error fetching messages from partition {}: {}", 
partitionId, e.getMessage(), e);
+            return new IggyMessageBatch(new ArrayList<>());
+        }
+    }
+
+    /**
+     * Ensures TCP connection to Iggy server is established.
+     */
+    private void ensureConnected() {
+        if (asyncClient == null) {
+            log.info("Connecting to Iggy server: {}", 
config.getServerAddress());
+
+            asyncClient = AsyncIggyTcpClient.builder()
+                    .host(config.getHost())
+                    .port(config.getPort())
+                    .credentials(config.getUsername(), config.getPassword())
+                    .connectionPoolSize(config.getConnectionPoolSize())
+                    .build();
+
+            // Connect and authenticate
+            asyncClient.connect().join();
+
+            // Parse stream and topic IDs
+            streamId = parseStreamId(config.getStreamId());
+            topicId = parseTopicId(config.getTopicId());
+            // Use single consumer instead of consumer group for explicit 
offset control
+            consumer = Consumer.of(ConsumerId.of(Long.valueOf(partitionId)));
+
+            log.info("Connected to Iggy server successfully");
+        }
+    }
+
+    /**
+     * Determines the starting offset for polling.
+     */
+    private long determineStartOffset(StreamPartitionMsgOffset startOffset) {
+        if (startOffset != null && startOffset instanceof 
IggyStreamPartitionMsgOffset) {
+            IggyStreamPartitionMsgOffset iggyOffset = 
(IggyStreamPartitionMsgOffset) startOffset;
+            currentOffset = iggyOffset.getOffset();
+            log.debug("Using provided start offset: {}", currentOffset);
+            return currentOffset;
+        }
+
+        // Use current tracked offset when no explicit offset provided
+        log.debug("Using current tracked offset for partition {}: {}", 
partitionId, currentOffset);
+        return currentOffset;
+    }
+
+    /**
+     * Polls messages from Iggy using TCP client.
+     */
+    private PolledMessages pollMessages(long fetchOffset) {
+        try {
+            Optional<Long> partition = Optional.of((long) partitionId);
+
+            // Use explicit offset strategy to fetch from the offset Pinot 
requested
+            PollingStrategy strategy = 
PollingStrategy.offset(java.math.BigInteger.valueOf(fetchOffset));
+
+            log.debug(
+                    "Polling messages: partition={}, offset={}, batchSize={}",
+                    partitionId,
+                    fetchOffset,
+                    config.getPollBatchSize());
+
+            // Poll with auto-commit disabled (we'll manage offsets via Pinot)
+            PolledMessages polledMessages = asyncClient
+                    .messages()
+                    .pollMessagesAsync(
+                            streamId,
+                            topicId,
+                            partition,
+                            consumer,
+                            strategy,
+                            Long.valueOf(config.getPollBatchSize()),
+                            false)
+                    .join();
+
+            log.debug(
+                    "Polled {} messages from partition {}, currentOffset={}",
+                    polledMessages.messages().size(),
+                    partitionId,
+                    polledMessages.currentOffset());
+
+            // Update current offset only if we got messages
+            if (!polledMessages.messages().isEmpty() && 
polledMessages.currentOffset() != null) {
+                currentOffset = polledMessages.currentOffset().longValue() + 1;
+            }
+
+            return polledMessages;
+
+        } catch (RuntimeException e) {
+            log.error("Error polling messages: {}", e.getMessage(), e);
+            throw new RuntimeException("Failed to poll messages", e);
+        }
+    }
+
+    /**
+     * Converts Iggy PolledMessages to Pinot MessageBatch.
+     */
+    private MessageBatch convertToMessageBatch(PolledMessages polledMessages) {
+        List<IggyMessageBatch.IggyMessageAndOffset> messages = new 
ArrayList<>();
+
+        for (Message message : polledMessages.messages()) {
+            long offset = message.header().offset().longValue();
+            byte[] payload = message.payload();
+
+            IggyStreamPartitionMsgOffset msgOffset = new 
IggyStreamPartitionMsgOffset(offset);
+            messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload, 
msgOffset));
+        }
+
+        return new IggyMessageBatch(messages);
+    }
+
+    /**
+     * Parses stream ID from string (supports both numeric and named streams).
+     */
+    private StreamId parseStreamId(String streamIdStr) {
+        try {
+            return StreamId.of(Long.parseLong(streamIdStr));
+        } catch (NumberFormatException e) {
+            return StreamId.of(streamIdStr);
+        }
+    }
+
+    /**
+     * Parses topic ID from string (supports both numeric and named topics).
+     */
+    private TopicId parseTopicId(String topicIdStr) {
+        try {
+            return TopicId.of(Long.parseLong(topicIdStr));
+        } catch (NumberFormatException e) {
+            return TopicId.of(topicIdStr);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (asyncClient != null) {
+            try {
+                log.info("Closing Iggy consumer for partition {}", 
partitionId);
+                asyncClient.close().join();
+                log.info("Iggy consumer closed successfully");
+            } catch (RuntimeException e) {
+                log.error("Error closing Iggy client: {}", e.getMessage(), e);
+            } finally {
+                asyncClient = null;
+            }
+        }
+    }
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionLevelConsumer.java
 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionLevelConsumer.java
new file mode 100644
index 000000000..5fa5b911e
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionLevelConsumer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Wrapper for IggyPartitionGroupConsumer to implement PartitionLevelConsumer 
interface.
+ * Delegates all operations to the underlying partition group consumer.
+ */
+public class IggyPartitionLevelConsumer implements PartitionLevelConsumer {
+
+    private final IggyPartitionGroupConsumer delegate;
+
+    public IggyPartitionLevelConsumer(IggyPartitionGroupConsumer delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, 
int timeoutMs) throws TimeoutException {
+        return delegate.fetchMessages(startOffset, timeoutMs);
+    }
+
+    @Override
+    public void close() {
+        delegate.close();
+    }
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffset.java
 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffset.java
new file mode 100644
index 000000000..6b7dce8bf
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffset.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+/**
+ * Represents an offset in an Iggy stream partition.
+ * Iggy uses monotonically increasing long values for offsets.
+ */
+public class IggyStreamPartitionMsgOffset implements StreamPartitionMsgOffset {
+
+    private final long offset;
+
+    /**
+     * Creates a new offset wrapper.
+     *
+     * @param offset the Iggy offset value
+     */
+    public IggyStreamPartitionMsgOffset(long offset) {
+        this.offset = offset;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    @Override
+    public int compareTo(StreamPartitionMsgOffset other) {
+        if (other instanceof IggyStreamPartitionMsgOffset) {
+            IggyStreamPartitionMsgOffset otherOffset = 
(IggyStreamPartitionMsgOffset) other;
+            return Long.compare(this.offset, otherOffset.offset);
+        } else if (other instanceof LongMsgOffset) {
+            // Handle comparison with Pinot's LongMsgOffset
+            LongMsgOffset longOffset = (LongMsgOffset) other;
+            return Long.compare(this.offset, longOffset.getOffset());
+        }
+        throw new IllegalArgumentException("Cannot compare with incompatible 
offset type: " + other.getClass());
+    }
+
+    @Override
+    public String toString() {
+        return String.valueOf(offset);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IggyStreamPartitionMsgOffset that = (IggyStreamPartitionMsgOffset) o;
+        return offset == that.offset;
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(offset);
+    }
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/decoder/IggyJsonMessageDecoder.java
 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/decoder/IggyJsonMessageDecoder.java
new file mode 100644
index 000000000..1a619f348
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/decoder/IggyJsonMessageDecoder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.decoder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * JSON message decoder for Iggy streams.
+ * Decodes JSON-formatted messages from Iggy into Pinot GenericRow format.
+ *
+ * <p>Configuration in Pinot table config:
+ * <pre>{@code
+ * "streamConfigs": {
+ *   "stream.iggy.decoder.class.name": 
"org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder"
+ * }
+ * }</pre>
+ */
+public class IggyJsonMessageDecoder implements StreamMessageDecoder<byte[]> {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    /**
+     * Initializes the decoder with configuration.
+     * Can be used to set up custom deserialization if needed.
+     *
+     * @param props decoder properties from streamConfigs
+     * @param fieldsToRead set of fields to read from messages
+     * @param topicName topic name
+     * @throws Exception if initialization fails
+     */
+    @Override
+    public void init(Map<String, String> props, Set<String> fieldsToRead, 
String topicName) throws Exception {
+        // No special initialization needed for basic JSON decoding
+    }
+
+    /**
+     * Decodes a JSON message payload into a GenericRow.
+     *
+     * @param payload raw byte array containing JSON
+     * @return GenericRow with decoded fields
+     */
+    @Override
+    public GenericRow decode(byte[] payload, GenericRow destination) {
+        try {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> jsonMap = OBJECT_MAPPER.readValue(payload, 
Map.class);
+
+            for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
+                destination.putValue(entry.getKey(), entry.getValue());
+            }
+
+            return destination;
+
+        } catch (java.io.IOException e) {
+            throw new RuntimeException("Failed to decode JSON message", e);
+        }
+    }
+
+    /**
+     * Decodes a JSON message and returns the specified field values.
+     *
+     * @param payload raw byte array containing JSON
+     * @param offset offset in the payload to start decoding
+     * @param length length of the message to decode
+     * @param destination destination GenericRow to populate
+     * @return GenericRow with requested fields
+     */
+    @Override
+    public GenericRow decode(byte[] payload, int offset, int length, 
GenericRow destination) {
+        // Create a new byte array for the specified range
+        byte[] messageBytes = new byte[length];
+        System.arraycopy(payload, offset, messageBytes, 0, length);
+        return decode(messageBytes, destination);
+    }
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java
 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java
new file mode 100644
index 000000000..36ef0aa6e
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.metadata;
+
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.connector.pinot.consumer.IggyStreamPartitionMsgOffset;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.partition.Partition;
+import org.apache.iggy.topic.TopicDetails;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/**
+ * Metadata provider for Iggy streams.
+ * Provides information about partitions, offsets, and message counts.
+ *
+ * <p>This provider connects to Iggy via TCP to query:
+ * <ul>
+ *   <li>Number of partitions in a topic</li>
+ *   <li>Oldest available offset per partition</li>
+ *   <li>Latest offset per partition</li>
+ *   <li>Message counts</li>
+ * </ul>
+ */
+public class IggyStreamMetadataProvider implements StreamMetadataProvider {
+
+    private static final Logger log = 
LoggerFactory.getLogger(IggyStreamMetadataProvider.class);
+
+    private static final long DETAILS_CACHE_MS = 5000; // 5 seconds cache
+
+    private final IggyStreamConfig config;
+    private final Integer partitionId; // null for stream-level, non-null for 
partition-level
+
+    private AsyncIggyTcpClient asyncClient;
+    private StreamId streamId;
+    private TopicId topicId;
+    private TopicDetails cachedTopicDetails;
+    private long lastDetailsRefresh;
+
+    /**
+     * Creates a stream-level metadata provider (all partitions).
+     *
+     * @param clientId unique identifier
+     * @param config Iggy stream configuration
+     */
+    public IggyStreamMetadataProvider(String clientId, IggyStreamConfig 
config) {
+        this(clientId, config, null);
+    }
+
+    /**
+     * Creates a partition-level metadata provider.
+     *
+     * @param clientId unique identifier
+     * @param config Iggy stream configuration
+     * @param partitionId specific partition ID
+     */
+    public IggyStreamMetadataProvider(String clientId, IggyStreamConfig 
config, Integer partitionId) {
+        this.config = config;
+        this.partitionId = partitionId;
+
+        log.info(
+                "Created IggyStreamMetadataProvider: clientId={}, 
partitionId={}, config={}",
+                clientId,
+                partitionId,
+                config);
+    }
+
+    /**
+     * Retrieves the number of partitions and their metadata.
+     * Called by Pinot to discover available partitions in the stream.
+     *
+     * @param timeoutMillis timeout for the operation
+     * @return number of partitions in the topic
+     */
+    @Override
+    public int fetchPartitionCount(long timeoutMillis) {
+        try {
+            ensureConnected();
+            TopicDetails topicDetails = fetchTopicDetails();
+            int partitionCount = topicDetails.partitionsCount().intValue();
+            log.info("Found {} partitions for topic {}", partitionCount, 
config.getTopicId());
+            return partitionCount;
+        } catch (RuntimeException e) {
+            log.error("Error fetching partition count: {}", e.getMessage(), e);
+            throw new RuntimeException("Failed to fetch partition count", e);
+        }
+    }
+
+    /**
+     * Fetches the current offset for consumption.
+     * For Iggy, we rely on consumer group state, so this returns the earliest 
offset.
+     *
+     * @param offsetCriteria offset criteria (earliest, latest, etc.)
+     * @param timeoutMillis timeout for the operation
+     * @return current offset for the partition
+     */
+    @Override
+    public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
+        try {
+            ensureConnected();
+
+            if (partitionId == null) {
+                throw new IllegalStateException("Partition ID must be set for 
offset queries");
+            }
+
+            Partition partition = getPartitionInfo(partitionId);
+
+            // Handle offset criteria
+            if (offsetCriteria != null && offsetCriteria.isSmallest()) {
+                // Return earliest available offset (0 for Iggy)
+                return new IggyStreamPartitionMsgOffset(0);
+            } else if (offsetCriteria != null && offsetCriteria.isLargest()) {
+                // Return latest offset based on messages count
+                long latestOffset = partition.messagesCount().longValue();
+                return new IggyStreamPartitionMsgOffset(latestOffset);
+            } else {
+                // Default to consumer group managed offset (start from 0)
+                return new IggyStreamPartitionMsgOffset(0);
+            }
+
+        } catch (RuntimeException e) {
+            log.error("Error fetching partition offset: {}", e.getMessage(), 
e);
+            throw new RuntimeException("Failed to fetch partition offset", e);
+        }
+    }
+
+    /**
+     * Ensures TCP connection to Iggy server is established.
+     */
+    private void ensureConnected() {
+        if (asyncClient == null) {
+            log.info("Connecting to Iggy server: {}", 
config.getServerAddress());
+
+            asyncClient = AsyncIggyTcpClient.builder()
+                    .host(config.getHost())
+                    .port(config.getPort())
+                    .credentials(config.getUsername(), config.getPassword())
+                    .connectionPoolSize(config.getConnectionPoolSize())
+                    .build();
+
+            // Connect and authenticate
+            asyncClient.connect().join();
+
+            // Parse stream and topic IDs
+            streamId = parseStreamId(config.getStreamId());
+            topicId = parseTopicId(config.getTopicId());
+
+            log.info("Connected to Iggy server successfully");
+        }
+    }
+
+    /**
+     * Fetches topic details with caching.
+     */
+    private TopicDetails fetchTopicDetails() {
+        long now = System.currentTimeMillis();
+        if (cachedTopicDetails == null || (now - lastDetailsRefresh) > 
DETAILS_CACHE_MS) {
+            try {
+                Optional<TopicDetails> details =
+                        asyncClient.topics().getTopicAsync(streamId, 
topicId).join();
+                cachedTopicDetails =
+                        details.orElseThrow(() -> new RuntimeException("Topic 
not found: " + config.getTopicId()));
+                lastDetailsRefresh = now;
+            } catch (RuntimeException e) {
+                log.error("Error fetching topic details: {}", e.getMessage(), 
e);
+                throw new RuntimeException("Failed to fetch topic details", e);
+            }
+        }
+        return cachedTopicDetails;
+    }
+
+    /**
+     * Gets information for a specific partition.
+     */
+    private Partition getPartitionInfo(int partitionId) {
+        TopicDetails details = fetchTopicDetails();
+        return details.partitions().stream()
+                .filter(p -> p.id().intValue() == partitionId)
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException("Partition " + 
partitionId + " not found"));
+    }
+
+    /**
+     * Parses stream ID from string (supports both numeric and named streams).
+     */
+    private StreamId parseStreamId(String streamIdStr) {
+        try {
+            return StreamId.of(Long.parseLong(streamIdStr));
+        } catch (NumberFormatException e) {
+            return StreamId.of(streamIdStr);
+        }
+    }
+
+    /**
+     * Parses topic ID from string (supports both numeric and named topics).
+     */
+    private TopicId parseTopicId(String topicIdStr) {
+        try {
+            return TopicId.of(Long.parseLong(topicIdStr));
+        } catch (NumberFormatException e) {
+            return TopicId.of(topicIdStr);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (asyncClient != null) {
+            try {
+                log.info("Closing Iggy metadata provider");
+                asyncClient.close().join();
+                log.info("Iggy metadata provider closed successfully");
+            } catch (RuntimeException e) {
+                log.error("Error closing Iggy client: {}", e.getMessage(), e);
+            } finally {
+                asyncClient = null;
+            }
+        }
+    }
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory
 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory
new file mode 100644
index 000000000..2a72c4d89
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#   http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/pinot-plugin.properties
 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/pinot-plugin.properties
new file mode 100644
index 000000000..98cae47c3
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/pinot-plugin.properties
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Iggy Stream Connector Plugin for Apache Pinot
+# This file is required for Pinot 1.3.0+ plugin discovery
+
+# Plugin name
+pluginName=iggy-connector
+
+# Plugin version
+pluginVersion=0.6.0
+
+# StreamConsumerFactory class
+stream.iggy.consumer.factory.class=org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory
+
+# MessageDecoder class
+stream.iggy.decoder.class=org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/config/IggyStreamConfigTest.java
 
b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/config/IggyStreamConfigTest.java
new file mode 100644
index 000000000..43f0ac7d2
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/config/IggyStreamConfigTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.config;
+
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class IggyStreamConfigTest {
+
+    @Test
+    void testValidConfiguration() {
+        Map<String, String> props = createValidConfig();
+        StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", 
props);
+        IggyStreamConfig config = new IggyStreamConfig(streamConfig);
+
+        assertEquals("localhost", config.getHost());
+        assertEquals(8090, config.getPort());
+        assertEquals("iggy", config.getUsername());
+        assertEquals("iggy", config.getPassword());
+        assertEquals("analytics", config.getStreamId());
+        assertEquals("events", config.getTopicId());
+        assertEquals("test-consumer-group", config.getConsumerGroup());
+        assertEquals(100, config.getPollBatchSize());
+        assertEquals(4, config.getConnectionPoolSize());
+        assertFalse(config.isEnableTls());
+    }
+
+    @Test
+    void testCustomConfiguration() {
+        Map<String, String> props = createValidConfig();
+        props.put("stream.iggy.port", "9090");
+        props.put("stream.iggy.username", "custom-user");
+        props.put("stream.iggy.password", "custom-pass");
+        props.put("stream.iggy.poll.batch.size", "500");
+        props.put("stream.iggy.connection.pool.size", "8");
+        props.put("stream.iggy.enable.tls", "true");
+
+        StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", 
props);
+        IggyStreamConfig config = new IggyStreamConfig(streamConfig);
+
+        assertEquals(9090, config.getPort());
+        assertEquals("custom-user", config.getUsername());
+        assertEquals("custom-pass", config.getPassword());
+        assertEquals(500, config.getPollBatchSize());
+        assertEquals(8, config.getConnectionPoolSize());
+        assertTrue(config.isEnableTls());
+    }
+
+    @Test
+    void testMissingHostThrowsException() {
+        Map<String, String> props = createValidConfig();
+        props.remove("stream.iggy.host");
+
+        StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", 
props);
+
+        IllegalArgumentException exception =
+                assertThrows(IllegalArgumentException.class, () -> new 
IggyStreamConfig(streamConfig));
+
+        assertTrue(exception.getMessage().contains("host"));
+    }
+
+    @Test
+    void testMissingStreamIdThrowsException() {
+        Map<String, String> props = createValidConfig();
+        props.remove("stream.iggy.stream.id");
+
+        StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", 
props);
+
+        IllegalArgumentException exception =
+                assertThrows(IllegalArgumentException.class, () -> new 
IggyStreamConfig(streamConfig));
+
+        assertTrue(exception.getMessage().contains("stream ID"));
+    }
+
+    @Test
+    void testMissingTopicIdThrowsException() {
+        Map<String, String> props = createValidConfig();
+        props.remove("stream.iggy.topic.id");
+
+        StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", 
props);
+
+        IllegalArgumentException exception =
+                assertThrows(IllegalArgumentException.class, () -> new 
IggyStreamConfig(streamConfig));
+
+        assertTrue(exception.getMessage().contains("topic ID"));
+    }
+
+    @Test
+    void testMissingConsumerGroupThrowsException() {
+        Map<String, String> props = createValidConfig();
+        props.remove("stream.iggy.consumer.group");
+
+        StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", 
props);
+
+        IllegalArgumentException exception =
+                assertThrows(IllegalArgumentException.class, () -> new 
IggyStreamConfig(streamConfig));
+
+        assertTrue(exception.getMessage().contains("consumer group"));
+    }
+
+    @Test
+    void testServerAddress() {
+        Map<String, String> props = createValidConfig();
+        StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", 
props);
+        IggyStreamConfig config = new IggyStreamConfig(streamConfig);
+
+        assertEquals("localhost:8090", config.getServerAddress());
+    }
+
+    @Test
+    void testTableNameWithType() {
+        Map<String, String> props = createValidConfig();
+        StreamConfig streamConfig = new StreamConfig("events_REALTIME", props);
+        IggyStreamConfig config = new IggyStreamConfig(streamConfig);
+
+        assertEquals("events_REALTIME", config.getTableNameWithType());
+    }
+
+    @Test
+    void testToString() {
+        Map<String, String> props = createValidConfig();
+        StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", 
props);
+        IggyStreamConfig config = new IggyStreamConfig(streamConfig);
+
+        String str = config.toString();
+        assertTrue(str.contains("localhost"));
+        assertTrue(str.contains("8090"));
+        assertTrue(str.contains("analytics"));
+        assertTrue(str.contains("events"));
+        assertTrue(str.contains("test-consumer-group"));
+    }
+
+    @Test
+    void testNumericStreamAndTopicIds() {
+        Map<String, String> props = createValidConfig();
+        props.put("stream.iggy.stream.id", "123");
+        props.put("stream.iggy.topic.id", "456");
+
+        StreamConfig streamConfig = new StreamConfig("test_table_REALTIME", 
props);
+        IggyStreamConfig config = new IggyStreamConfig(streamConfig);
+
+        assertEquals("123", config.getStreamId());
+        assertEquals("456", config.getTopicId());
+    }
+
+    private Map<String, String> createValidConfig() {
+        Map<String, String> props = new HashMap<>();
+        props.put("streamType", "iggy"); // Required by Pinot StreamConfig
+        props.put("stream.iggy.topic.name", "events"); // Required by Pinot 
StreamConfig
+        props.put("stream.iggy.consumer.type", "lowlevel"); // Required by 
Pinot
+        props.put(
+                "stream.iggy.consumer.factory.class.name",
+                
"org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory");
+        props.put("stream.iggy.decoder.class.name", 
"org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder");
+
+        props.put("stream.iggy.host", "localhost");
+        props.put("stream.iggy.port", "8090");
+        props.put("stream.iggy.username", "iggy");
+        props.put("stream.iggy.password", "iggy");
+        props.put("stream.iggy.stream.id", "analytics");
+        props.put("stream.iggy.topic.id", "events");
+        props.put("stream.iggy.consumer.group", "test-consumer-group");
+        props.put("stream.iggy.poll.batch.size", "100");
+        props.put("stream.iggy.connection.pool.size", "4");
+        props.put("stream.iggy.enable.tls", "false");
+        return props;
+    }
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatchTest.java
 
b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatchTest.java
new file mode 100644
index 000000000..aca0be844
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatchTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class IggyMessageBatchTest {
+
+    @Test
+    void testEmptyBatch() {
+        IggyMessageBatch batch = new IggyMessageBatch(new ArrayList<>());
+        assertEquals(0, batch.getMessageCount());
+    }
+
+    @Test
+    void testSingleMessage() {
+        List<IggyMessageBatch.IggyMessageAndOffset> messages = new 
ArrayList<>();
+        byte[] payload = "test message".getBytes(StandardCharsets.UTF_8);
+        IggyStreamPartitionMsgOffset offset = new 
IggyStreamPartitionMsgOffset(100L);
+        messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload, 
offset));
+
+        IggyMessageBatch batch = new IggyMessageBatch(messages);
+
+        assertEquals(1, batch.getMessageCount());
+        assertArrayEquals(payload, batch.getMessageAtIndex(0));
+        assertEquals(payload.length, batch.getMessageLengthAtIndex(0));
+        assertEquals(100L, batch.getNextStreamMessageOffsetAtIndex(0));
+        assertEquals(offset, batch.getNextStreamPartitionMsgOffsetAtIndex(0));
+    }
+
+    @Test
+    void testMultipleMessages() {
+        List<IggyMessageBatch.IggyMessageAndOffset> messages = new 
ArrayList<>();
+
+        for (int i = 0; i < 10; i++) {
+            byte[] payload = ("message-" + i).getBytes(StandardCharsets.UTF_8);
+            IggyStreamPartitionMsgOffset offset = new 
IggyStreamPartitionMsgOffset(i * 100L);
+            messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload, 
offset));
+        }
+
+        IggyMessageBatch batch = new IggyMessageBatch(messages);
+
+        assertEquals(10, batch.getMessageCount());
+
+        for (int i = 0; i < 10; i++) {
+            byte[] expectedPayload = ("message-" + 
i).getBytes(StandardCharsets.UTF_8);
+            assertArrayEquals(expectedPayload, batch.getMessageAtIndex(i));
+            assertEquals(expectedPayload.length, 
batch.getMessageLengthAtIndex(i));
+            assertEquals(i * 100L, batch.getNextStreamMessageOffsetAtIndex(i));
+            assertEquals(i, batch.getMessageOffsetAtIndex(i));
+        }
+    }
+
+    @Test
+    void testMessageAndOffsetWrapper() {
+        byte[] payload = "test".getBytes(StandardCharsets.UTF_8);
+        IggyStreamPartitionMsgOffset offset = new 
IggyStreamPartitionMsgOffset(123L);
+
+        IggyMessageBatch.IggyMessageAndOffset wrapper = new 
IggyMessageBatch.IggyMessageAndOffset(payload, offset);
+
+        assertArrayEquals(payload, wrapper.getMessage());
+        assertEquals(offset, wrapper.getOffset());
+        assertEquals(123L, wrapper.getOffset().getOffset());
+    }
+
+    @Test
+    void testNullOffsetAtInvalidIndex() {
+        List<IggyMessageBatch.IggyMessageAndOffset> messages = new 
ArrayList<>();
+        byte[] payload = "test".getBytes(StandardCharsets.UTF_8);
+        IggyStreamPartitionMsgOffset offset = new 
IggyStreamPartitionMsgOffset(100L);
+        messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload, 
offset));
+
+        IggyMessageBatch batch = new IggyMessageBatch(messages);
+
+        assertNull(batch.getNextStreamPartitionMsgOffsetAtIndex(-1));
+        assertNull(batch.getNextStreamPartitionMsgOffsetAtIndex(10));
+        assertEquals(0, batch.getNextStreamMessageOffsetAtIndex(-1));
+        assertEquals(0, batch.getNextStreamMessageOffsetAtIndex(10));
+    }
+
+    @Test
+    void testLargeMessageBatch() {
+        List<IggyMessageBatch.IggyMessageAndOffset> messages = new 
ArrayList<>();
+
+        // Create 1000 messages
+        for (int i = 0; i < 1000; i++) {
+            byte[] payload = new byte[1024]; // 1KB per message
+            IggyStreamPartitionMsgOffset offset = new 
IggyStreamPartitionMsgOffset(i);
+            messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload, 
offset));
+        }
+
+        IggyMessageBatch batch = new IggyMessageBatch(messages);
+
+        assertEquals(1000, batch.getMessageCount());
+        assertEquals(1024, batch.getMessageLengthAtIndex(0));
+        assertEquals(1024, batch.getMessageLengthAtIndex(999));
+    }
+}
diff --git 
a/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffsetTest.java
 
b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffsetTest.java
new file mode 100644
index 000000000..d3841b784
--- /dev/null
+++ 
b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffsetTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class IggyStreamPartitionMsgOffsetTest {
+
+    @Test
+    void testOffsetCreation() {
+        IggyStreamPartitionMsgOffset offset = new 
IggyStreamPartitionMsgOffset(100L);
+        assertEquals(100L, offset.getOffset());
+    }
+
+    @Test
+    void testCompareTo() {
+        IggyStreamPartitionMsgOffset offset1 = new 
IggyStreamPartitionMsgOffset(100L);
+        IggyStreamPartitionMsgOffset offset2 = new 
IggyStreamPartitionMsgOffset(200L);
+        IggyStreamPartitionMsgOffset offset3 = new 
IggyStreamPartitionMsgOffset(100L);
+
+        assertTrue(offset1.compareTo(offset2) < 0);
+        assertTrue(offset2.compareTo(offset1) > 0);
+        assertEquals(0, offset1.compareTo(offset3));
+    }
+
+    @Test
+    void testEquals() {
+        IggyStreamPartitionMsgOffset offset1 = new 
IggyStreamPartitionMsgOffset(100L);
+        IggyStreamPartitionMsgOffset offset2 = new 
IggyStreamPartitionMsgOffset(100L);
+        IggyStreamPartitionMsgOffset offset3 = new 
IggyStreamPartitionMsgOffset(200L);
+
+        assertEquals(offset1, offset2);
+        assertNotEquals(offset1, offset3);
+        assertEquals(offset1, offset1);
+        assertNotEquals(offset1, null);
+        assertNotEquals(offset1, "string");
+    }
+
+    @Test
+    void testHashCode() {
+        IggyStreamPartitionMsgOffset offset1 = new 
IggyStreamPartitionMsgOffset(100L);
+        IggyStreamPartitionMsgOffset offset2 = new 
IggyStreamPartitionMsgOffset(100L);
+        IggyStreamPartitionMsgOffset offset3 = new 
IggyStreamPartitionMsgOffset(200L);
+
+        assertEquals(offset1.hashCode(), offset2.hashCode());
+        assertNotEquals(offset1.hashCode(), offset3.hashCode());
+    }
+
+    @Test
+    void testToString() {
+        IggyStreamPartitionMsgOffset offset = new 
IggyStreamPartitionMsgOffset(12345L);
+        assertEquals("12345", offset.toString());
+    }
+
+    @Test
+    void testZeroOffset() {
+        IggyStreamPartitionMsgOffset offset = new 
IggyStreamPartitionMsgOffset(0L);
+        assertEquals(0L, offset.getOffset());
+        assertEquals("0", offset.toString());
+    }
+
+    @Test
+    void testLargeOffset() {
+        long largeOffset = Long.MAX_VALUE - 1;
+        IggyStreamPartitionMsgOffset offset = new 
IggyStreamPartitionMsgOffset(largeOffset);
+        assertEquals(largeOffset, offset.getOffset());
+        assertEquals(String.valueOf(largeOffset), offset.toString());
+    }
+}
diff --git a/foreign/java/gradle/libs.versions.toml 
b/foreign/java/gradle/libs.versions.toml
index 63cd5b3bd..dd3e8408c 100644
--- a/foreign/java/gradle/libs.versions.toml
+++ b/foreign/java/gradle/libs.versions.toml
@@ -19,8 +19,12 @@
 # Flink
 flink = "2.1.1"
 
+# Pinot
+pinot = "1.4.0"
+
 # Jackson
 jackson = "3.0.2"
+jackson2 = "2.18.2"
 
 # Apache Commons
 commons-lang3 = "3.20.0"
@@ -58,6 +62,10 @@ checkstyle = "12.1.2"
 [libraries]
 # Jackson
 jackson-databind = { module = "tools.jackson.core:jackson-databind", 
version.ref = "jackson" }
+jackson2-databind = { module = "com.fasterxml.jackson.core:jackson-databind", 
version.ref = "jackson2" }
+
+# Pinot
+pinot-spi = { module = "org.apache.pinot:pinot-spi", version.ref = "pinot" }
 
 # Apache HTTP Client
 httpclient5 = { module = "org.apache.httpcomponents.client5:httpclient5", 
version.ref = "httpclient5" }
diff --git a/foreign/java/settings.gradle.kts b/foreign/java/settings.gradle.kts
index 050119ed8..477beb85d 100644
--- a/foreign/java/settings.gradle.kts
+++ b/foreign/java/settings.gradle.kts
@@ -28,3 +28,6 @@ project(":iggy-connector-library").projectDir = 
file("external-processors/iggy-c
 
 include("iggy-flink-examples")
 project(":iggy-flink-examples").projectDir = 
file("external-processors/iggy-connector-flink/iggy-flink-examples")
+
+include("iggy-connector-pinot")
+project(":iggy-connector-pinot").projectDir = 
file("external-processors/iggy-connector-pinot")


Reply via email to