This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new efd8809f8 feat(connector): add iggy-pinot external connector (#2499)
efd8809f8 is described below
commit efd8809f8ee6c9c0576534a245a2698847373aa3
Author: Chiradip Mandal <[email protected]>
AuthorDate: Wed Jan 21 00:39:59 2026 -0800
feat(connector): add iggy-pinot external connector (#2499)
Co-authored-by: Maciej Modzelewski <[email protected]>
---
.../iggy-connector-pinot/build.gradle.kts | 71 ++++++
.../iggy-connector-pinot/deployment/schema.json | 31 +++
.../iggy-connector-pinot/deployment/table.json | 42 ++++
.../iggy-connector-pinot/docker-compose.yml | 135 +++++++++++
.../examples/sample-messages.json | 47 ++++
.../iggy-connector-pinot/examples/schema.json | 59 +++++
.../examples/table-config.json | 43 ++++
.../iggy-connector-pinot/integration-test.sh | 234 ++++++++++++++++++
.../connector/pinot/config/IggyStreamConfig.java | 206 ++++++++++++++++
.../pinot/consumer/IggyConsumerFactory.java | 116 +++++++++
.../connector/pinot/consumer/IggyMessageBatch.java | 130 ++++++++++
.../pinot/consumer/IggyPartitionGroupConsumer.java | 265 +++++++++++++++++++++
.../pinot/consumer/IggyPartitionLevelConsumer.java | 49 ++++
.../consumer/IggyStreamPartitionMsgOffset.java | 80 +++++++
.../pinot/decoder/IggyJsonMessageDecoder.java | 97 ++++++++
.../pinot/metadata/IggyStreamMetadataProvider.java | 243 +++++++++++++++++++
...g.apache.pinot.spi.stream.StreamConsumerFactory | 16 ++
.../src/main/resources/pinot-plugin.properties | 31 +++
.../pinot/config/IggyStreamConfigTest.java | 193 +++++++++++++++
.../pinot/consumer/IggyMessageBatchTest.java | 123 ++++++++++
.../consumer/IggyStreamPartitionMsgOffsetTest.java | 90 +++++++
foreign/java/gradle/libs.versions.toml | 8 +
foreign/java/settings.gradle.kts | 3 +
23 files changed, 2312 insertions(+)
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/build.gradle.kts
b/foreign/java/external-processors/iggy-connector-pinot/build.gradle.kts
new file mode 100644
index 000000000..89e986978
--- /dev/null
+++ b/foreign/java/external-processors/iggy-connector-pinot/build.gradle.kts
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+ id("iggy.java-library-conventions")
+}
+
+dependencies {
+ // Iggy SDK - use local project when building within Iggy repository
+ api(project(":iggy"))
+
+ // Apache Pinot dependencies (provided - not bundled with connector)
+ compileOnly(libs.pinot.spi)
+
+ // Serialization support - use Jackson 2.x for Pinot compatibility
+ implementation(libs.jackson2.databind) {
+ exclude(group = "tools.jackson.core")
+ }
+
+ // Apache Commons
+ implementation(libs.commons.lang3)
+
+ // Logging
+ compileOnly(libs.slf4j.api)
+
+ // Testing
+ testImplementation(platform(libs.junit.bom))
+ testImplementation(libs.bundles.testing)
+ testImplementation(libs.pinot.spi) // Need Pinot SPI for tests
+ testRuntimeOnly(libs.slf4j.simple)
+}
+
+// Task to copy runtime dependencies for Docker deployment (flattened into
libs directory)
+tasks.register<Copy>("copyDependencies") {
+ from(configurations.runtimeClasspath)
+ into(layout.buildDirectory.dir("libs"))
+}
+
+// Make jar task depend on copyDependencies
+tasks.named("jar") {
+ finalizedBy("copyDependencies")
+}
+
+publishing {
+ publications {
+ named<MavenPublication>("maven") {
+ artifactId = "pinot-connector"
+
+ pom {
+ name = "Apache Iggy - Pinot Connector"
+ description = "Apache Iggy connector plugin for Apache Pinot
stream ingestion"
+ }
+ }
+ }
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/deployment/schema.json
b/foreign/java/external-processors/iggy-connector-pinot/deployment/schema.json
new file mode 100644
index 000000000..d7ff8496c
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/deployment/schema.json
@@ -0,0 +1,31 @@
+{
+ "schemaName": "test_events",
+ "dimensionFieldSpecs": [
+ {
+ "name": "userId",
+ "dataType": "STRING"
+ },
+ {
+ "name": "eventType",
+ "dataType": "STRING"
+ },
+ {
+ "name": "deviceType",
+ "dataType": "STRING"
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "name": "duration",
+ "dataType": "LONG"
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "timestamp",
+ "dataType": "LONG",
+ "format": "1:MILLISECONDS:EPOCH",
+ "granularity": "1:MILLISECONDS"
+ }
+ ]
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/deployment/table.json
b/foreign/java/external-processors/iggy-connector-pinot/deployment/table.json
new file mode 100644
index 000000000..f6af85365
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/deployment/table.json
@@ -0,0 +1,42 @@
+{
+ "tableName": "test_events",
+ "tableType": "REALTIME",
+ "segmentsConfig": {
+ "timeColumnName": "timestamp",
+ "timeType": "MILLISECONDS",
+ "replication": "1",
+ "schemaName": "test_events"
+ },
+ "tenants": {
+ "broker": "DefaultTenant",
+ "server": "DefaultTenant"
+ },
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "streamConfigs": {
+ "streamType": "iggy",
+ "stream.iggy.topic.name": "test-events",
+ "stream.iggy.consumer.type": "lowlevel",
+ "stream.iggy.consumer.factory.class.name":
"org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory",
+ "realtime.segment.consumer.factory.class.name":
"org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory",
+ "stream.iggy.decoder.class.name":
"org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder",
+
+ "stream.iggy.host": "iggy",
+ "stream.iggy.port": "8090",
+ "stream.iggy.username": "iggy",
+ "stream.iggy.password": "iggy",
+
+ "stream.iggy.stream.id": "test-stream",
+ "stream.iggy.topic.id": "test-events",
+ "stream.iggy.consumer.group": "pinot-integration-test",
+
+ "stream.iggy.poll.batch.size": "100",
+ "stream.iggy.connection.pool.size": "4",
+ "stream.iggy.consumer.prop.auto.offset.reset": "smallest",
+
+ "realtime.segment.flush.threshold.rows": "1000",
+ "realtime.segment.flush.threshold.time": "600000"
+ }
+ },
+ "metadata": {}
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/docker-compose.yml
b/foreign/java/external-processors/iggy-connector-pinot/docker-compose.yml
new file mode 100644
index 000000000..889338fd2
--- /dev/null
+++ b/foreign/java/external-processors/iggy-connector-pinot/docker-compose.yml
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+services:
+ # Apache Iggy Server (from official Apache repo)
+ iggy:
+ image: apache/iggy:latest
+ container_name: iggy-server
+ ports:
+ - "8090:8090" # TCP
+ - "3000:3000" # HTTP
+ - "8080:8080" # QUIC
+ environment:
+ - IGGY_SYSTEM_LOGGING_LEVEL=info
+ - IGGY_TCP_ADDRESS=0.0.0.0:8090
+ - IGGY_HTTP_ENABLED=true
+ - IGGY_HTTP_ADDRESS=0.0.0.0:3000
+ - IGGY_QUIC_ADDRESS=0.0.0.0:8080
+ - IGGY_ROOT_USERNAME=iggy
+ - IGGY_ROOT_PASSWORD=iggy
+ cap_add:
+ - SYS_NICE
+ security_opt:
+ - seccomp:unconfined
+ ulimits:
+ memlock:
+ soft: -1
+ hard: -1
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:3000/"]
+ interval: 10s
+ timeout: 5s
+ retries: 5
+ start_period: 10s
+ networks:
+ - iggy-pinot-network
+
+ # Zookeeper (required by Pinot)
+ zookeeper:
+ image: zookeeper:3.9
+ container_name: pinot-zookeeper
+ ports:
+ - "2181:2181"
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ networks:
+ - iggy-pinot-network
+
+ # Apache Pinot Controller
+ pinot-controller:
+ image: apachepinot/pinot:latest
+ container_name: pinot-controller
+ command: "StartController -zkAddress zookeeper:2181"
+ ports:
+ - "9000:9000"
+ environment:
+ JAVA_OPTS: "-Xms1G -Xmx2G -XX:+UseG1GC"
+ depends_on:
+ - zookeeper
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:9000/health"]
+ interval: 10s
+ timeout: 5s
+ retries: 10
+ start_period: 30s
+ volumes:
+ - ./build/libs:/opt/pinot/plugins/pinot-stream-ingestion/iggy-connector
+ -
../../java-sdk/build/libs/iggy-0.6.0.jar:/opt/pinot/plugins/pinot-stream-ingestion/iggy-connector/iggy-0.6.0.jar
+ - ./deployment:/opt/pinot/deployment
+ networks:
+ - iggy-pinot-network
+
+ # Apache Pinot Broker
+ pinot-broker:
+ image: apachepinot/pinot:latest
+ container_name: pinot-broker
+ command: "StartBroker -zkAddress zookeeper:2181"
+ ports:
+ - "8099:8099"
+ environment:
+ JAVA_OPTS: "-Xms512M -Xmx1G -XX:+UseG1GC"
+ depends_on:
+ - pinot-controller
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:8099/health"]
+ interval: 10s
+ timeout: 5s
+ retries: 10
+ start_period: 30s
+ networks:
+ - iggy-pinot-network
+
+ # Apache Pinot Server
+ pinot-server:
+ image: apachepinot/pinot:latest
+ container_name: pinot-server
+ command: "StartServer -zkAddress zookeeper:2181"
+ ports:
+ - "8098:8098"
+ - "8097:8097"
+ environment:
+ JAVA_OPTS: "-Xms1G -Xmx2G -XX:+UseG1GC -Dplugins.include=iggy-connector"
+ depends_on:
+ - pinot-broker
+ healthcheck:
+ test: ["CMD", "curl", "-f", "http://localhost:8097/health"]
+ interval: 10s
+ timeout: 5s
+ retries: 10
+ start_period: 30s
+ volumes:
+ - ./build/libs:/opt/pinot/plugins/pinot-stream-ingestion/iggy-connector
+ -
../../java-sdk/build/libs/iggy-0.6.0.jar:/opt/pinot/plugins/pinot-stream-ingestion/iggy-connector/iggy-0.6.0.jar
+ - ./deployment:/opt/pinot/deployment
+ networks:
+ - iggy-pinot-network
+
+networks:
+ iggy-pinot-network:
+ driver: bridge
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/examples/sample-messages.json
b/foreign/java/external-processors/iggy-connector-pinot/examples/sample-messages.json
new file mode 100644
index 000000000..df0cbcfaa
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/examples/sample-messages.json
@@ -0,0 +1,47 @@
+{
+ "description": "Sample messages to send to Iggy for testing Pinot ingestion",
+ "messages": [
+ {
+ "userId": "user_12345",
+ "sessionId": "session_abc123",
+ "eventType": "page_view",
+ "pageUrl": "/products/laptop",
+ "deviceType": "desktop",
+ "browser": "Chrome",
+ "country": "USA",
+ "city": "San Francisco",
+ "duration": 45000,
+ "pageLoadTime": 1200,
+ "scrollDepth": 75,
+ "eventTimestamp": 1701234567890
+ },
+ {
+ "userId": "user_67890",
+ "sessionId": "session_def456",
+ "eventType": "click",
+ "pageUrl": "/checkout",
+ "deviceType": "mobile",
+ "browser": "Safari",
+ "country": "UK",
+ "city": "London",
+ "duration": 2000,
+ "pageLoadTime": 800,
+ "scrollDepth": 100,
+ "eventTimestamp": 1701234570000
+ },
+ {
+ "userId": "user_12345",
+ "sessionId": "session_abc123",
+ "eventType": "purchase",
+ "pageUrl": "/confirmation",
+ "deviceType": "desktop",
+ "browser": "Chrome",
+ "country": "USA",
+ "city": "San Francisco",
+ "duration": 10000,
+ "pageLoadTime": 950,
+ "scrollDepth": 50,
+ "eventTimestamp": 1701234580000
+ }
+ ]
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/examples/schema.json
b/foreign/java/external-processors/iggy-connector-pinot/examples/schema.json
new file mode 100644
index 000000000..77b226091
--- /dev/null
+++ b/foreign/java/external-processors/iggy-connector-pinot/examples/schema.json
@@ -0,0 +1,59 @@
+{
+ "schemaName": "user_events",
+ "dimensionFieldSpecs": [
+ {
+ "name": "userId",
+ "dataType": "STRING"
+ },
+ {
+ "name": "sessionId",
+ "dataType": "STRING"
+ },
+ {
+ "name": "eventType",
+ "dataType": "STRING"
+ },
+ {
+ "name": "pageUrl",
+ "dataType": "STRING"
+ },
+ {
+ "name": "deviceType",
+ "dataType": "STRING"
+ },
+ {
+ "name": "browser",
+ "dataType": "STRING"
+ },
+ {
+ "name": "country",
+ "dataType": "STRING"
+ },
+ {
+ "name": "city",
+ "dataType": "STRING"
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "name": "duration",
+ "dataType": "LONG"
+ },
+ {
+ "name": "pageLoadTime",
+ "dataType": "INT"
+ },
+ {
+ "name": "scrollDepth",
+ "dataType": "INT"
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "eventTimestamp",
+ "dataType": "LONG",
+ "format": "1:MILLISECONDS:EPOCH",
+ "granularity": "1:MILLISECONDS"
+ }
+ ]
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/examples/table-config.json
b/foreign/java/external-processors/iggy-connector-pinot/examples/table-config.json
new file mode 100644
index 000000000..a6b16ad95
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/examples/table-config.json
@@ -0,0 +1,43 @@
+{
+ "tableName": "user_events_realtime",
+ "tableType": "REALTIME",
+ "segmentsConfig": {
+ "timeColumnName": "eventTimestamp",
+ "timeType": "MILLISECONDS",
+ "replication": "1",
+ "schemaName": "user_events"
+ },
+ "tenants": {
+ "broker": "DefaultTenant",
+ "server": "DefaultTenant"
+ },
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "streamConfigs": {
+ "streamType": "iggy",
+ "stream.iggy.consumer.type": "lowlevel",
+ "stream.iggy.consumer.factory.class.name":
"org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory",
+ "stream.iggy.decoder.class.name":
"org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder",
+
+ "stream.iggy.host": "localhost",
+ "stream.iggy.port": "8090",
+ "stream.iggy.username": "iggy",
+ "stream.iggy.password": "iggy",
+ "stream.iggy.enable.tls": "false",
+
+ "stream.iggy.stream.id": "analytics",
+ "stream.iggy.topic.id": "user-events",
+ "stream.iggy.consumer.group": "pinot-realtime-consumer",
+
+ "stream.iggy.poll.batch.size": "1000",
+ "stream.iggy.connection.pool.size": "8",
+
+ "realtime.segment.flush.threshold.rows": "50000",
+ "realtime.segment.flush.threshold.time": "3600000",
+ "realtime.segment.flush.threshold.segment.size": "100M"
+ }
+ },
+ "metadata": {
+ "customConfigs": {}
+ }
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/integration-test.sh
b/foreign/java/external-processors/iggy-connector-pinot/integration-test.sh
new file mode 100755
index 000000000..4e42a37bf
--- /dev/null
+++ b/foreign/java/external-processors/iggy-connector-pinot/integration-test.sh
@@ -0,0 +1,234 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -e
+
+# Colors for output
+GREEN='\033[0;32m'
+YELLOW='\033[1;33m'
+RED='\033[0;31m'
+NC='\033[0m' # No Color
+
+echo -e "${GREEN}=====================================${NC}"
+echo -e "${GREEN}Iggy-Pinot Integration Test${NC}"
+echo -e "${GREEN}=====================================${NC}"
+
+# Navigate to connector directory
+cd "$(dirname "$0")"
+
+# Step 1: Build JARs
+echo -e "\n${YELLOW}Step 1: Building JARs...${NC}"
+cd ../../
+gradle :iggy-connector-pinot:jar :iggy:jar
+cd external-processors/iggy-connector-pinot
+echo -e "${GREEN}✓ JARs built successfully${NC}"
+
+# Step 2: Start Docker environment
+echo -e "\n${YELLOW}Step 2: Starting Docker environment...${NC}"
+docker-compose down -v
+docker-compose up -d
+echo -e "${GREEN}✓ Docker containers starting${NC}"
+
+# Step 3: Wait for services to be healthy
+echo -e "\n${YELLOW}Step 3: Waiting for services to be healthy...${NC}"
+
+echo -n "Waiting for Iggy... "
+for i in {1..30}; do
+ if curl --connect-timeout 3 --max-time 5 -s http://localhost:3000/ >
/dev/null 2>&1; then
+ echo -e "${GREEN}✓${NC}"
+ break
+ fi
+ sleep 2
+ echo -n "."
+done
+
+echo -n "Waiting for Pinot Controller... "
+for i in {1..60}; do
+ if curl --connect-timeout 3 --max-time 5 -s http://localhost:9000/health >
/dev/null 2>&1; then
+ echo -e "${GREEN}✓${NC}"
+ break
+ fi
+ sleep 2
+ echo -n "."
+done
+
+echo -n "Waiting for Pinot Broker... "
+for i in {1..60}; do
+ if curl --connect-timeout 3 --max-time 5 -s http://localhost:8099/health >
/dev/null 2>&1; then
+ echo -e "${GREEN}✓${NC}"
+ break
+ fi
+ sleep 2
+ echo -n "."
+done
+
+echo -n "Waiting for Pinot Server... "
+for i in {1..60}; do
+ if curl --connect-timeout 3 --max-time 5 -s http://localhost:8097/health >
/dev/null 2>&1; then
+ echo -e "${GREEN}✓${NC}"
+ break
+ fi
+ sleep 2
+ echo -n "."
+done
+
+sleep 5 # Extra time for services to stabilize
+
+# Step 4: Login to Iggy and create stream/topic
+echo -e "\n${YELLOW}Step 4: Logging in to Iggy and creating
stream/topic...${NC}"
+
+# Login and get JWT token
+TOKEN=$(curl -s -X POST "http://localhost:3000/users/login" \
+ -H "Content-Type: application/json" \
+ -d '{"username": "iggy", "password": "iggy"}' | jq -r '.access_token.token')
+
+if [ -z "$TOKEN" ] || [ "$TOKEN" = "null" ]; then
+ echo -e "${RED}✗ Failed to get authentication token${NC}"
+ exit 1
+fi
+
+echo -e "${GREEN}✓ Authenticated${NC}"
+
+# Create stream
+curl -s -X POST "http://localhost:3000/streams" \
+ -H "Authorization: Bearer $TOKEN" \
+ -H "Content-Type: application/json" \
+ -d '{"stream_id": 1, "name": "test-stream"}' \
+ && echo -e "${GREEN}✓ Stream created${NC}" || echo -e "${RED}✗ Stream
creation failed (may already exist)${NC}"
+
+# Create topic
+TOPIC_RESPONSE=$(curl -s -X POST
"http://localhost:3000/streams/test-stream/topics" \
+ -H "Authorization: Bearer $TOKEN" \
+ -H "Content-Type: application/json" \
+ -d '{"topic_id": 1, "name": "test-events", "partitions_count": 2,
"compression_algorithm": "none", "message_expiry": 0, "max_topic_size": 0}')
+
+if echo "$TOPIC_RESPONSE" | grep -q '"id"'; then
+ echo -e "${GREEN}✓ Topic created${NC}"
+else
+ echo -e "${RED}✗ Topic creation failed: $TOPIC_RESPONSE${NC}"
+ exit 1
+fi
+
+# Create consumer group (topic-scoped, not stream-scoped)
+curl -s -X POST
"http://localhost:3000/streams/test-stream/topics/test-events/consumer-groups" \
+ -H "Authorization: Bearer $TOKEN" \
+ -H "Content-Type: application/json" \
+ -d '{"name": "pinot-integration-test"}' \
+ && echo -e "${GREEN}✓ Consumer group created${NC}" || echo -e
"${YELLOW}Note: Consumer group may already exist${NC}"
+
+# Step 5: Create Pinot schema
+echo -e "\n${YELLOW}Step 5: Creating Pinot schema...${NC}"
+curl -X POST "http://localhost:9000/schemas" \
+ -H "Content-Type: application/json" \
+ -d @deployment/schema.json \
+ && echo -e "${GREEN}✓ Schema created${NC}" || echo -e "${RED}✗ Schema
creation failed${NC}"
+
+# Step 6: Create Pinot table
+echo -e "\n${YELLOW}Step 6: Creating Pinot realtime table...${NC}"
+TABLE_RESPONSE=$(curl -s -X POST "http://localhost:9000/tables" \
+ -H "Content-Type: application/json" \
+ -d @deployment/table.json)
+
+if echo "$TABLE_RESPONSE" | grep -q '"status":"Table test_events_REALTIME
succesfully added"'; then
+ echo -e "${GREEN}✓ Table created${NC}"
+elif echo "$TABLE_RESPONSE" | grep -q '"code":500'; then
+ echo -e "${RED}✗ Table creation failed${NC}"
+ echo "$TABLE_RESPONSE" | jq '.'
+ exit 1
+else
+ echo -e "${GREEN}✓ Table created${NC}"
+fi
+
+sleep 5 # Let table initialize
+
+# Step 7: Send test messages to Iggy
+echo -e "\n${YELLOW}Step 7: Sending test messages to Iggy...${NC}"
+
+# Partition value for partition 0 (4-byte little-endian, base64 encoded)
+PARTITION_VALUE=$(printf '\x00\x00\x00\x00' | base64)
+
+for i in {1..10}; do
+ TIMESTAMP=$(($(date +%s) * 1000))
+ MESSAGE=$(cat <<EOF
+{
+ "userId": "user$i",
+ "eventType": "test_event",
+ "deviceType": "desktop",
+ "duration": $((i * 100)),
+ "timestamp": $TIMESTAMP
+}
+EOF
+)
+
+ curl -X POST
"http://localhost:3000/streams/test-stream/topics/test-events/messages" \
+ -H "Authorization: Bearer $TOKEN" \
+ -H "Content-Type: application/json" \
+ -d "{\"partitioning\": {\"kind\": \"partition_id\", \"value\":
\"$PARTITION_VALUE\"}, \"messages\": [{\"payload\": \"$(echo "$MESSAGE" |
base64)\"}]}" \
+ > /dev/null 2>&1
+ echo -e "${GREEN}✓ Message $i sent${NC}"
+ sleep 1
+done
+
+# Step 8: Wait for ingestion
+echo -e "\n${YELLOW}Step 8: Waiting for Pinot to ingest messages...${NC}"
+sleep 15
+
+# Step 9: Query Pinot and verify data
+echo -e "\n${YELLOW}Step 9: Querying Pinot for ingested data...${NC}"
+
+QUERY_RESULT=$(curl -s -X POST "http://localhost:8099/query/sql" \
+ -H "Content-Type: application/json" \
+ -d '{"sql": "SELECT COUNT(*) FROM test_events_REALTIME"}')
+
+echo "Query Result:"
+echo "$QUERY_RESULT" | jq '.'
+
+# Extract count from result
+COUNT=$(echo "$QUERY_RESULT" | jq -r '.resultTable.rows[0][0]' 2>/dev/null ||
echo "0")
+
+if [ "$COUNT" -gt "0" ]; then
+ echo -e "\n${GREEN}=====================================${NC}"
+ echo -e "${GREEN}✓ Integration Test PASSED!${NC}"
+ echo -e "${GREEN}Successfully ingested $COUNT messages${NC}"
+ echo -e "${GREEN}=====================================${NC}"
+
+ # Show sample data
+ echo -e "\n${YELLOW}Sample data:${NC}"
+ curl -s -X POST "http://localhost:8099/query/sql" \
+ -H "Content-Type: application/json" \
+ -d '{"sql": "SELECT * FROM test_events_REALTIME LIMIT 5"}' | jq '.'
+
+ EXIT_CODE=0
+else
+ echo -e "\n${RED}=====================================${NC}"
+ echo -e "${RED}✗ Integration Test FAILED!${NC}"
+ echo -e "${RED}No messages ingested${NC}"
+ echo -e "${RED}=====================================${NC}"
+
+ # Show logs for debugging
+ echo -e "\n${YELLOW}Pinot Server logs:${NC}"
+ docker logs pinot-server --tail 50
+
+ EXIT_CODE=1
+fi
+
+# Cleanup option
+echo -e "\n${YELLOW}To stop the environment: docker-compose down -v${NC}"
+echo -e "${YELLOW}To view logs: docker-compose logs -f${NC}"
+
+exit $EXIT_CODE
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/config/IggyStreamConfig.java
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/config/IggyStreamConfig.java
new file mode 100644
index 000000000..28ef8c55d
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/config/IggyStreamConfig.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.config;
+
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+
+import java.util.Map;
+
+/**
+ * Configuration class for Iggy stream ingestion in Pinot.
+ * Extracts and validates Iggy-specific properties from Pinot's streamConfigs.
+ *
+ * <p>Configuration properties (with prefix "stream.iggy."):
+ * <ul>
+ * <li>host - Iggy server hostname (required)</li>
+ * <li>port - Iggy server TCP port (default: 8090)</li>
+ * <li>username - Authentication username (default: "iggy")</li>
+ * <li>password - Authentication password (default: "iggy")</li>
+ * <li>stream.id - Iggy stream identifier (required)</li>
+ * <li>topic.id - Iggy topic identifier (required)</li>
+ * <li>consumer.group - Consumer group name (required)</li>
+ * <li>connection.pool.size - TCP connection pool size (default: 4)</li>
+ * <li>poll.batch.size - Number of messages to fetch per poll (default:
100)</li>
+ * <li>enable.tls - Enable TLS encryption (default: false)</li>
+ * </ul>
+ */
+public class IggyStreamConfig {
+
+ private static final String IGGY_PREFIX = "stream.iggy.";
+
+ // Connection properties
+ private static final String HOST_KEY = IGGY_PREFIX + "host";
+ private static final String PORT_KEY = IGGY_PREFIX + "port";
+ private static final String USERNAME_KEY = IGGY_PREFIX + "username";
+ private static final String PASSWORD_KEY = IGGY_PREFIX + "password";
+ private static final String ENABLE_TLS_KEY = IGGY_PREFIX + "enable.tls";
+ private static final String CONNECTION_POOL_SIZE_KEY = IGGY_PREFIX +
"connection.pool.size";
+
+ // Stream/Topic properties
+ private static final String STREAM_ID_KEY = IGGY_PREFIX + "stream.id";
+ private static final String TOPIC_ID_KEY = IGGY_PREFIX + "topic.id";
+
+ // Consumer properties
+ private static final String CONSUMER_GROUP_KEY = IGGY_PREFIX +
"consumer.group";
+ private static final String POLL_BATCH_SIZE_KEY = IGGY_PREFIX +
"poll.batch.size";
+
+ // Default values
+ private static final int DEFAULT_PORT = 8090;
+ private static final String DEFAULT_USERNAME = "iggy";
+ private static final String DEFAULT_PASSWORD = "iggy";
+ private static final boolean DEFAULT_ENABLE_TLS = false;
+ private static final int DEFAULT_CONNECTION_POOL_SIZE = 4;
+ private static final int DEFAULT_POLL_BATCH_SIZE = 100;
+
+ private final StreamConfig streamConfig;
+ private final Map<String, String> props;
+
+ /**
+ * Creates a new Iggy stream configuration from Pinot's StreamConfig.
+ *
+ * @param streamConfig Pinot stream configuration
+ */
+ public IggyStreamConfig(StreamConfig streamConfig) {
+ this.streamConfig = streamConfig;
+ this.props = streamConfig.getStreamConfigsMap();
+ validate();
+ }
+
+ /**
+ * Validates required configuration properties.
+ *
+ * @throws IllegalArgumentException if required properties are missing
+ */
+ private void validate() {
+ requireProperty(HOST_KEY, "Iggy server host is required");
+ requireProperty(STREAM_ID_KEY, "Iggy stream ID is required");
+ requireProperty(TOPIC_ID_KEY, "Iggy topic ID is required");
+ requireProperty(CONSUMER_GROUP_KEY, "Iggy consumer group is required");
+ }
+
+ private void requireProperty(String key, String errorMessage) {
+ if (!props.containsKey(key)
+ || props.get(key) == null
+ || props.get(key).trim().isEmpty()) {
+ throw new IllegalArgumentException(errorMessage + " (property: " +
key + ")");
+ }
+ }
+
+ public String getHost() {
+ return props.get(HOST_KEY);
+ }
+
+ public int getPort() {
+ String portStr = props.get(PORT_KEY);
+ return portStr != null ? Integer.parseInt(portStr) : DEFAULT_PORT;
+ }
+
+ public String getUsername() {
+ return props.getOrDefault(USERNAME_KEY, DEFAULT_USERNAME);
+ }
+
+ public String getPassword() {
+ return props.getOrDefault(PASSWORD_KEY, DEFAULT_PASSWORD);
+ }
+
+ public boolean isEnableTls() {
+ String tlsStr = props.get(ENABLE_TLS_KEY);
+ return tlsStr != null ? Boolean.parseBoolean(tlsStr) :
DEFAULT_ENABLE_TLS;
+ }
+
+ public int getConnectionPoolSize() {
+ String poolSizeStr = props.get(CONNECTION_POOL_SIZE_KEY);
+ return poolSizeStr != null ? Integer.parseInt(poolSizeStr) :
DEFAULT_CONNECTION_POOL_SIZE;
+ }
+
+ public String getStreamId() {
+ return props.get(STREAM_ID_KEY);
+ }
+
+ public String getTopicId() {
+ return props.get(TOPIC_ID_KEY);
+ }
+
+ public String getConsumerGroup() {
+ return props.get(CONSUMER_GROUP_KEY);
+ }
+
+ public int getPollBatchSize() {
+ String batchSizeStr = props.get(POLL_BATCH_SIZE_KEY);
+ return batchSizeStr != null ? Integer.parseInt(batchSizeStr) :
DEFAULT_POLL_BATCH_SIZE;
+ }
+
+ /**
+ * Gets the offset specification from Pinot's consumer config.
+ *
+ * @return offset criteria
+ */
+ public OffsetCriteria getOffsetCriteria() {
+ return streamConfig.getOffsetCriteria();
+ }
+
+ /**
+ * Gets the Pinot table name for this stream.
+ *
+ * @return table name with type suffix
+ */
+ public String getTableNameWithType() {
+ return streamConfig.getTableNameWithType();
+ }
+
+ /**
+ * Creates server address in format "host:port".
+ *
+ * @return server address string
+ */
+ public String getServerAddress() {
+ return getHost() + ":" + getPort();
+ }
+
+ @Override
+ public String toString() {
+ return "IggyStreamConfig{"
+ + "host='"
+ + getHost()
+ + '\''
+ + ", port="
+ + getPort()
+ + ", username='"
+ + getUsername()
+ + '\''
+ + ", streamId='"
+ + getStreamId()
+ + '\''
+ + ", topicId='"
+ + getTopicId()
+ + '\''
+ + ", consumerGroup='"
+ + getConsumerGroup()
+ + '\''
+ + ", enableTls="
+ + isEnableTls()
+ + ", connectionPoolSize="
+ + getConnectionPoolSize()
+ + ", pollBatchSize="
+ + getPollBatchSize()
+ + '}';
+ }
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java
new file mode 100644
index 000000000..8c10f4f60
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyConsumerFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.connector.pinot.metadata.IggyStreamMetadataProvider;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+
+/**
+ * Factory for creating Iggy stream consumers and metadata providers.
+ * This is the main entry point for Pinot's stream ingestion framework to
interact with Iggy.
+ *
+ * <p>Configuration in Pinot table config:
+ * <pre>{@code
+ * "streamConfigs": {
+ * "streamType": "iggy",
+ * "stream.iggy.consumer.factory.class.name":
"org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory",
+ * "stream.iggy.host": "localhost",
+ * "stream.iggy.port": "8090",
+ * "stream.iggy.username": "iggy",
+ * "stream.iggy.password": "iggy",
+ * "stream.iggy.stream.id": "my-stream",
+ * "stream.iggy.topic.id": "my-topic",
+ * "stream.iggy.consumer.group": "pinot-consumer-group",
+ * "stream.iggy.poll.batch.size": "100"
+ * }
+ * }</pre>
+ */
+public class IggyConsumerFactory extends StreamConsumerFactory {
+
+ private StreamConfig streamConfig;
+
+ @Override
+ public void init(StreamConfig streamConfig) {
+ this.streamConfig = streamConfig;
+ }
+
+ /**
+ * Creates a partition-level consumer for reading from a specific Iggy
partition.
+ * Pinot calls this method for each partition that needs to be consumed.
+ *
+ * @param clientId unique identifier for this consumer instance
+ * @param partitionGroupConsumptionStatus consumption status containing
partition group ID and offset info
+ * @return a new partition consumer instance
+ */
+ @Override
+ public PartitionGroupConsumer createPartitionGroupConsumer(
+ String clientId, PartitionGroupConsumptionStatus
partitionGroupConsumptionStatus) {
+ IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig);
+ int partitionGroupId =
partitionGroupConsumptionStatus.getPartitionGroupId();
+ return new IggyPartitionGroupConsumer(clientId, iggyConfig,
partitionGroupId);
+ }
+
+ /**
+ * Creates a partition-level consumer (newer Pinot API).
+ * Wraps the partition group consumer for compatibility.
+ *
+ * @param clientId unique identifier for this consumer instance
+ * @param partition partition identifier
+ * @return a new partition consumer instance
+ */
+ @Override
+ public PartitionLevelConsumer createPartitionLevelConsumer(String
clientId, int partition) {
+ IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig);
+ IggyPartitionGroupConsumer groupConsumer = new
IggyPartitionGroupConsumer(clientId, iggyConfig, partition);
+ return new IggyPartitionLevelConsumer(groupConsumer);
+ }
+
+ /**
+ * Creates a metadata provider for querying stream information.
+ * Used by Pinot to discover partitions and check offset positions.
+ *
+ * @param clientId unique identifier for this metadata provider instance
+ * @param groupId partition group identifier
+ * @return a new metadata provider instance
+ */
+ @Override
+ public StreamMetadataProvider createPartitionMetadataProvider(String
clientId, int groupId) {
+ IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig);
+ return new IggyStreamMetadataProvider(clientId, iggyConfig, groupId);
+ }
+
+ /**
+ * Creates a metadata provider for the entire stream (all partitions).
+ *
+ * @param clientId unique identifier for this metadata provider instance
+ * @return a new metadata provider instance
+ */
+ @Override
+ public StreamMetadataProvider createStreamMetadataProvider(String
clientId) {
+ IggyStreamConfig iggyConfig = new IggyStreamConfig(this.streamConfig);
+ return new IggyStreamMetadataProvider(clientId, iggyConfig);
+ }
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatch.java
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatch.java
new file mode 100644
index 000000000..5e59ba6a8
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatch.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.util.List;
+
+/**
+ * Implementation of Pinot's MessageBatch for Iggy messages.
+ * Wraps a list of messages with their offsets for consumption by Pinot.
+ */
+public class IggyMessageBatch implements MessageBatch<byte[]> {
+
+ private final List<IggyMessageAndOffset> messages;
+
+ /**
+ * Creates a new message batch.
+ *
+ * @param messages list of messages with offsets
+ */
+ public IggyMessageBatch(List<IggyMessageAndOffset> messages) {
+ this.messages = messages;
+ }
+
+ @Override
+ public int getMessageCount() {
+ return messages.size();
+ }
+
+ @Override
+ public byte[] getMessageAtIndex(int index) {
+ return messages.get(index).message;
+ }
+
+ @Override
+ public int getMessageOffsetAtIndex(int index) {
+ return index;
+ }
+
+ @Override
+ public int getMessageLengthAtIndex(int index) {
+ return messages.get(index).message.length;
+ }
+
+ @Override
+ public long getNextStreamMessageOffsetAtIndex(int index) {
+ if (index >= 0 && index < messages.size()) {
+ return messages.get(index).offset.getOffset();
+ }
+ return 0;
+ }
+
+ @Override
+ public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int
index) {
+ if (index >= 0 && index < messages.size()) {
+ return messages.get(index).offset;
+ }
+ return null;
+ }
+
+ @Override
+ public StreamMessage<byte[]> getStreamMessage(int index) {
+ IggyMessageAndOffset messageAndOffset = messages.get(index);
+
+ // Calculate next offset (current + 1)
+ long currentOffset = messageAndOffset.offset.getOffset();
+ IggyStreamPartitionMsgOffset nextOffset = new
IggyStreamPartitionMsgOffset(currentOffset + 1);
+
+ // Create metadata with offset information
+ StreamMessageMetadata metadata = new StreamMessageMetadata.Builder()
+ .setRecordIngestionTimeMs(System.currentTimeMillis())
+ .setOffset(messageAndOffset.offset, nextOffset)
+ .build();
+
+ // Create and return StreamMessage
+ return new StreamMessage<>(null, messageAndOffset.message,
messageAndOffset.message.length, metadata);
+ }
+
+ @Override
+ public StreamPartitionMsgOffset getOffsetOfNextBatch() {
+ if (messages.isEmpty()) {
+ return new IggyStreamPartitionMsgOffset(0);
+ }
+ // Return the offset after the last message
+ long lastOffset = messages.get(messages.size() - 1).offset.getOffset();
+ return new IggyStreamPartitionMsgOffset(lastOffset + 1);
+ }
+
+ /**
+ * Container for an Iggy message and its offset.
+ */
+ public static class IggyMessageAndOffset {
+ private final byte[] message;
+ private final IggyStreamPartitionMsgOffset offset;
+
+ public IggyMessageAndOffset(byte[] message,
IggyStreamPartitionMsgOffset offset) {
+ this.message = message;
+ this.offset = offset;
+ }
+
+ public byte[] getMessage() {
+ return message;
+ }
+
+ public IggyStreamPartitionMsgOffset getOffset() {
+ return offset;
+ }
+ }
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java
new file mode 100644
index 000000000..37ba72fc0
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionGroupConsumer.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.ConsumerId;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.PolledMessages;
+import org.apache.iggy.message.PollingStrategy;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Partition-level consumer implementation for Iggy streams.
+ * Reads messages from a single Iggy partition using the AsyncIggyTcpClient.
+ *
+ * <p>This consumer manages:
+ * <ul>
+ * <li>TCP connection to Iggy server</li>
+ * <li>Single consumer mode (not consumer groups)</li>
+ * <li>Message polling with explicit offset tracking</li>
+ * <li>Offset management controlled by Pinot</li>
+ * </ul>
+ */
+public class IggyPartitionGroupConsumer implements PartitionGroupConsumer {
+
+ private static final Logger log =
LoggerFactory.getLogger(IggyPartitionGroupConsumer.class);
+
+ private final IggyStreamConfig config;
+ private final int partitionId;
+
+ private AsyncIggyTcpClient asyncClient;
+ private StreamId streamId;
+ private TopicId topicId;
+ private Consumer consumer;
+ private long currentOffset;
+
+ /**
+ * Creates a new partition consumer.
+ *
+ * @param clientId unique identifier for this consumer
+ * @param config Iggy stream configuration
+ * @param partitionId the partition to consume from
+ */
+ public IggyPartitionGroupConsumer(String clientId, IggyStreamConfig
config, int partitionId) {
+ this.config = config;
+ this.partitionId = partitionId;
+ this.currentOffset = 0;
+
+ log.info(
+ "Created IggyPartitionGroupConsumer: clientId={},
partition={}, config={}",
+ clientId,
+ partitionId,
+ config);
+ }
+
+ /**
+ * Fetches the next batch of messages from the Iggy partition.
+ * This method is called repeatedly by Pinot to poll for new messages.
+ *
+ * @param startOffset the offset to start consuming from (may be null)
+ * @param timeoutMillis timeout for the fetch operation
+ * @return batch of messages, or empty batch if no messages available
+ */
+ @Override
+ public MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset,
int timeoutMillis) {
+ try {
+ ensureConnected();
+
+ // No need to join consumer group when using single consumer
+
+ // Determine starting offset
+ long fetchOffset = determineStartOffset(startOffset);
+ log.debug("Fetching messages from partition {} at offset {}",
partitionId, fetchOffset);
+
+ // Poll messages from Iggy
+ PolledMessages polledMessages = pollMessages(fetchOffset);
+ log.debug(
+ "Polled {} messages from partition {}",
+ polledMessages.messages().size(),
+ partitionId);
+
+ // Convert to Pinot MessageBatch
+ MessageBatch batch = convertToMessageBatch(polledMessages);
+ return batch;
+
+ } catch (RuntimeException e) {
+ log.error("Error fetching messages from partition {}: {}",
partitionId, e.getMessage(), e);
+ return new IggyMessageBatch(new ArrayList<>());
+ }
+ }
+
+ /**
+ * Ensures TCP connection to Iggy server is established.
+ */
+ private void ensureConnected() {
+ if (asyncClient == null) {
+ log.info("Connecting to Iggy server: {}",
config.getServerAddress());
+
+ asyncClient = AsyncIggyTcpClient.builder()
+ .host(config.getHost())
+ .port(config.getPort())
+ .credentials(config.getUsername(), config.getPassword())
+ .connectionPoolSize(config.getConnectionPoolSize())
+ .build();
+
+ // Connect and authenticate
+ asyncClient.connect().join();
+
+ // Parse stream and topic IDs
+ streamId = parseStreamId(config.getStreamId());
+ topicId = parseTopicId(config.getTopicId());
+ // Use single consumer instead of consumer group for explicit
offset control
+ consumer = Consumer.of(ConsumerId.of(Long.valueOf(partitionId)));
+
+ log.info("Connected to Iggy server successfully");
+ }
+ }
+
+ /**
+ * Determines the starting offset for polling.
+ */
+ private long determineStartOffset(StreamPartitionMsgOffset startOffset) {
+ if (startOffset != null && startOffset instanceof
IggyStreamPartitionMsgOffset) {
+ IggyStreamPartitionMsgOffset iggyOffset =
(IggyStreamPartitionMsgOffset) startOffset;
+ currentOffset = iggyOffset.getOffset();
+ log.debug("Using provided start offset: {}", currentOffset);
+ return currentOffset;
+ }
+
+ // Use current tracked offset when no explicit offset provided
+ log.debug("Using current tracked offset for partition {}: {}",
partitionId, currentOffset);
+ return currentOffset;
+ }
+
+ /**
+ * Polls messages from Iggy using TCP client.
+ */
+ private PolledMessages pollMessages(long fetchOffset) {
+ try {
+ Optional<Long> partition = Optional.of((long) partitionId);
+
+ // Use explicit offset strategy to fetch from the offset Pinot
requested
+ PollingStrategy strategy =
PollingStrategy.offset(java.math.BigInteger.valueOf(fetchOffset));
+
+ log.debug(
+ "Polling messages: partition={}, offset={}, batchSize={}",
+ partitionId,
+ fetchOffset,
+ config.getPollBatchSize());
+
+ // Poll with auto-commit disabled (we'll manage offsets via Pinot)
+ PolledMessages polledMessages = asyncClient
+ .messages()
+ .pollMessagesAsync(
+ streamId,
+ topicId,
+ partition,
+ consumer,
+ strategy,
+ Long.valueOf(config.getPollBatchSize()),
+ false)
+ .join();
+
+ log.debug(
+ "Polled {} messages from partition {}, currentOffset={}",
+ polledMessages.messages().size(),
+ partitionId,
+ polledMessages.currentOffset());
+
+ // Update current offset only if we got messages
+ if (!polledMessages.messages().isEmpty() &&
polledMessages.currentOffset() != null) {
+ currentOffset = polledMessages.currentOffset().longValue() + 1;
+ }
+
+ return polledMessages;
+
+ } catch (RuntimeException e) {
+ log.error("Error polling messages: {}", e.getMessage(), e);
+ throw new RuntimeException("Failed to poll messages", e);
+ }
+ }
+
+ /**
+ * Converts Iggy PolledMessages to Pinot MessageBatch.
+ */
+ private MessageBatch convertToMessageBatch(PolledMessages polledMessages) {
+ List<IggyMessageBatch.IggyMessageAndOffset> messages = new
ArrayList<>();
+
+ for (Message message : polledMessages.messages()) {
+ long offset = message.header().offset().longValue();
+ byte[] payload = message.payload();
+
+ IggyStreamPartitionMsgOffset msgOffset = new
IggyStreamPartitionMsgOffset(offset);
+ messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload,
msgOffset));
+ }
+
+ return new IggyMessageBatch(messages);
+ }
+
+ /**
+ * Parses stream ID from string (supports both numeric and named streams).
+ */
+ private StreamId parseStreamId(String streamIdStr) {
+ try {
+ return StreamId.of(Long.parseLong(streamIdStr));
+ } catch (NumberFormatException e) {
+ return StreamId.of(streamIdStr);
+ }
+ }
+
+ /**
+ * Parses topic ID from string (supports both numeric and named topics).
+ */
+ private TopicId parseTopicId(String topicIdStr) {
+ try {
+ return TopicId.of(Long.parseLong(topicIdStr));
+ } catch (NumberFormatException e) {
+ return TopicId.of(topicIdStr);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (asyncClient != null) {
+ try {
+ log.info("Closing Iggy consumer for partition {}",
partitionId);
+ asyncClient.close().join();
+ log.info("Iggy consumer closed successfully");
+ } catch (RuntimeException e) {
+ log.error("Error closing Iggy client: {}", e.getMessage(), e);
+ } finally {
+ asyncClient = null;
+ }
+ }
+ }
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionLevelConsumer.java
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionLevelConsumer.java
new file mode 100644
index 000000000..5fa5b911e
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyPartitionLevelConsumer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Wrapper for IggyPartitionGroupConsumer to implement PartitionLevelConsumer
interface.
+ * Delegates all operations to the underlying partition group consumer.
+ */
+public class IggyPartitionLevelConsumer implements PartitionLevelConsumer {
+
+ private final IggyPartitionGroupConsumer delegate;
+
+ public IggyPartitionLevelConsumer(IggyPartitionGroupConsumer delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset,
int timeoutMs) throws TimeoutException {
+ return delegate.fetchMessages(startOffset, timeoutMs);
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffset.java
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffset.java
new file mode 100644
index 000000000..6b7dce8bf
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffset.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+/**
+ * Represents an offset in an Iggy stream partition.
+ * Iggy uses monotonically increasing long values for offsets.
+ */
+public class IggyStreamPartitionMsgOffset implements StreamPartitionMsgOffset {
+
+ private final long offset;
+
+ /**
+ * Creates a new offset wrapper.
+ *
+ * @param offset the Iggy offset value
+ */
+ public IggyStreamPartitionMsgOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ @Override
+ public int compareTo(StreamPartitionMsgOffset other) {
+ if (other instanceof IggyStreamPartitionMsgOffset) {
+ IggyStreamPartitionMsgOffset otherOffset =
(IggyStreamPartitionMsgOffset) other;
+ return Long.compare(this.offset, otherOffset.offset);
+ } else if (other instanceof LongMsgOffset) {
+ // Handle comparison with Pinot's LongMsgOffset
+ LongMsgOffset longOffset = (LongMsgOffset) other;
+ return Long.compare(this.offset, longOffset.getOffset());
+ }
+ throw new IllegalArgumentException("Cannot compare with incompatible
offset type: " + other.getClass());
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(offset);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IggyStreamPartitionMsgOffset that = (IggyStreamPartitionMsgOffset) o;
+ return offset == that.offset;
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.hashCode(offset);
+ }
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/decoder/IggyJsonMessageDecoder.java
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/decoder/IggyJsonMessageDecoder.java
new file mode 100644
index 000000000..1a619f348
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/decoder/IggyJsonMessageDecoder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.decoder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * JSON message decoder for Iggy streams.
+ * Decodes JSON-formatted messages from Iggy into Pinot GenericRow format.
+ *
+ * <p>Configuration in Pinot table config:
+ * <pre>{@code
+ * "streamConfigs": {
+ * "stream.iggy.decoder.class.name":
"org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder"
+ * }
+ * }</pre>
+ */
+public class IggyJsonMessageDecoder implements StreamMessageDecoder<byte[]> {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ /**
+ * Initializes the decoder with configuration.
+ * Can be used to set up custom deserialization if needed.
+ *
+ * @param props decoder properties from streamConfigs
+ * @param fieldsToRead set of fields to read from messages
+ * @param topicName topic name
+ * @throws Exception if initialization fails
+ */
+ @Override
+ public void init(Map<String, String> props, Set<String> fieldsToRead,
String topicName) throws Exception {
+ // No special initialization needed for basic JSON decoding
+ }
+
+ /**
+ * Decodes a JSON message payload into a GenericRow.
+ *
+ * @param payload raw byte array containing JSON
+ * @return GenericRow with decoded fields
+ */
+ @Override
+ public GenericRow decode(byte[] payload, GenericRow destination) {
+ try {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> jsonMap = OBJECT_MAPPER.readValue(payload,
Map.class);
+
+ for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
+ destination.putValue(entry.getKey(), entry.getValue());
+ }
+
+ return destination;
+
+ } catch (java.io.IOException e) {
+ throw new RuntimeException("Failed to decode JSON message", e);
+ }
+ }
+
+ /**
+ * Decodes a JSON message and returns the specified field values.
+ *
+ * @param payload raw byte array containing JSON
+ * @param offset offset in the payload to start decoding
+ * @param length length of the message to decode
+ * @param destination destination GenericRow to populate
+ * @return GenericRow with requested fields
+ */
+ @Override
+ public GenericRow decode(byte[] payload, int offset, int length,
GenericRow destination) {
+ // Create a new byte array for the specified range
+ byte[] messageBytes = new byte[length];
+ System.arraycopy(payload, offset, messageBytes, 0, length);
+ return decode(messageBytes, destination);
+ }
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java
new file mode 100644
index 000000000..36ef0aa6e
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/main/java/org/apache/iggy/connector/pinot/metadata/IggyStreamMetadataProvider.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.metadata;
+
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.connector.pinot.config.IggyStreamConfig;
+import org.apache.iggy.connector.pinot.consumer.IggyStreamPartitionMsgOffset;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.partition.Partition;
+import org.apache.iggy.topic.TopicDetails;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/**
+ * Metadata provider for Iggy streams.
+ * Provides information about partitions, offsets, and message counts.
+ *
+ * <p>This provider connects to Iggy via TCP to query:
+ * <ul>
+ * <li>Number of partitions in a topic</li>
+ * <li>Oldest available offset per partition</li>
+ * <li>Latest offset per partition</li>
+ * <li>Message counts</li>
+ * </ul>
+ */
+public class IggyStreamMetadataProvider implements StreamMetadataProvider {
+
+ private static final Logger log =
LoggerFactory.getLogger(IggyStreamMetadataProvider.class);
+
+ private static final long DETAILS_CACHE_MS = 5000; // 5 seconds cache
+
+ private final IggyStreamConfig config;
+ private final Integer partitionId; // null for stream-level, non-null for
partition-level
+
+ private AsyncIggyTcpClient asyncClient;
+ private StreamId streamId;
+ private TopicId topicId;
+ private TopicDetails cachedTopicDetails;
+ private long lastDetailsRefresh;
+
+ /**
+ * Creates a stream-level metadata provider (all partitions).
+ *
+ * @param clientId unique identifier
+ * @param config Iggy stream configuration
+ */
+ public IggyStreamMetadataProvider(String clientId, IggyStreamConfig
config) {
+ this(clientId, config, null);
+ }
+
+ /**
+ * Creates a partition-level metadata provider.
+ *
+ * @param clientId unique identifier
+ * @param config Iggy stream configuration
+ * @param partitionId specific partition ID
+ */
+ public IggyStreamMetadataProvider(String clientId, IggyStreamConfig
config, Integer partitionId) {
+ this.config = config;
+ this.partitionId = partitionId;
+
+ log.info(
+ "Created IggyStreamMetadataProvider: clientId={},
partitionId={}, config={}",
+ clientId,
+ partitionId,
+ config);
+ }
+
+ /**
+ * Retrieves the number of partitions and their metadata.
+ * Called by Pinot to discover available partitions in the stream.
+ *
+ * @param timeoutMillis timeout for the operation
+ * @return number of partitions in the topic
+ */
+ @Override
+ public int fetchPartitionCount(long timeoutMillis) {
+ try {
+ ensureConnected();
+ TopicDetails topicDetails = fetchTopicDetails();
+ int partitionCount = topicDetails.partitionsCount().intValue();
+ log.info("Found {} partitions for topic {}", partitionCount,
config.getTopicId());
+ return partitionCount;
+ } catch (RuntimeException e) {
+ log.error("Error fetching partition count: {}", e.getMessage(), e);
+ throw new RuntimeException("Failed to fetch partition count", e);
+ }
+ }
+
+ /**
+ * Fetches the current offset for consumption.
+ * For Iggy, we rely on consumer group state, so this returns the earliest
offset.
+ *
+ * @param offsetCriteria offset criteria (earliest, latest, etc.)
+ * @param timeoutMillis timeout for the operation
+ * @return current offset for the partition
+ */
+ @Override
+ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
+ try {
+ ensureConnected();
+
+ if (partitionId == null) {
+ throw new IllegalStateException("Partition ID must be set for
offset queries");
+ }
+
+ Partition partition = getPartitionInfo(partitionId);
+
+ // Handle offset criteria
+ if (offsetCriteria != null && offsetCriteria.isSmallest()) {
+ // Return earliest available offset (0 for Iggy)
+ return new IggyStreamPartitionMsgOffset(0);
+ } else if (offsetCriteria != null && offsetCriteria.isLargest()) {
+ // Return latest offset based on messages count
+ long latestOffset = partition.messagesCount().longValue();
+ return new IggyStreamPartitionMsgOffset(latestOffset);
+ } else {
+ // Default to consumer group managed offset (start from 0)
+ return new IggyStreamPartitionMsgOffset(0);
+ }
+
+ } catch (RuntimeException e) {
+ log.error("Error fetching partition offset: {}", e.getMessage(),
e);
+ throw new RuntimeException("Failed to fetch partition offset", e);
+ }
+ }
+
+ /**
+ * Ensures TCP connection to Iggy server is established.
+ */
+ private void ensureConnected() {
+ if (asyncClient == null) {
+ log.info("Connecting to Iggy server: {}",
config.getServerAddress());
+
+ asyncClient = AsyncIggyTcpClient.builder()
+ .host(config.getHost())
+ .port(config.getPort())
+ .credentials(config.getUsername(), config.getPassword())
+ .connectionPoolSize(config.getConnectionPoolSize())
+ .build();
+
+ // Connect and authenticate
+ asyncClient.connect().join();
+
+ // Parse stream and topic IDs
+ streamId = parseStreamId(config.getStreamId());
+ topicId = parseTopicId(config.getTopicId());
+
+ log.info("Connected to Iggy server successfully");
+ }
+ }
+
+ /**
+ * Fetches topic details with caching.
+ */
+ private TopicDetails fetchTopicDetails() {
+ long now = System.currentTimeMillis();
+ if (cachedTopicDetails == null || (now - lastDetailsRefresh) >
DETAILS_CACHE_MS) {
+ try {
+ Optional<TopicDetails> details =
+ asyncClient.topics().getTopicAsync(streamId,
topicId).join();
+ cachedTopicDetails =
+ details.orElseThrow(() -> new RuntimeException("Topic
not found: " + config.getTopicId()));
+ lastDetailsRefresh = now;
+ } catch (RuntimeException e) {
+ log.error("Error fetching topic details: {}", e.getMessage(),
e);
+ throw new RuntimeException("Failed to fetch topic details", e);
+ }
+ }
+ return cachedTopicDetails;
+ }
+
+ /**
+ * Gets information for a specific partition.
+ */
+ private Partition getPartitionInfo(int partitionId) {
+ TopicDetails details = fetchTopicDetails();
+ return details.partitions().stream()
+ .filter(p -> p.id().intValue() == partitionId)
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("Partition " +
partitionId + " not found"));
+ }
+
+ /**
+ * Parses stream ID from string (supports both numeric and named streams).
+ */
+ private StreamId parseStreamId(String streamIdStr) {
+ try {
+ return StreamId.of(Long.parseLong(streamIdStr));
+ } catch (NumberFormatException e) {
+ return StreamId.of(streamIdStr);
+ }
+ }
+
+ /**
+ * Parses topic ID from string (supports both numeric and named topics).
+ */
+ private TopicId parseTopicId(String topicIdStr) {
+ try {
+ return TopicId.of(Long.parseLong(topicIdStr));
+ } catch (NumberFormatException e) {
+ return TopicId.of(topicIdStr);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (asyncClient != null) {
+ try {
+ log.info("Closing Iggy metadata provider");
+ asyncClient.close().join();
+ log.info("Iggy metadata provider closed successfully");
+ } catch (RuntimeException e) {
+ log.error("Error closing Iggy client: {}", e.getMessage(), e);
+ } finally {
+ asyncClient = null;
+ }
+ }
+ }
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory
b/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory
new file mode 100644
index 000000000..2a72c4d89
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/META-INF/services/org.apache.pinot.spi.stream.StreamConsumerFactory
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/pinot-plugin.properties
b/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/pinot-plugin.properties
new file mode 100644
index 000000000..98cae47c3
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/main/resources/pinot-plugin.properties
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Iggy Stream Connector Plugin for Apache Pinot
+# This file is required for Pinot 1.3.0+ plugin discovery
+
+# Plugin name
+pluginName=iggy-connector
+
+# Plugin version
+pluginVersion=0.6.0
+
+# StreamConsumerFactory class
+stream.iggy.consumer.factory.class=org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory
+
+# MessageDecoder class
+stream.iggy.decoder.class=org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/config/IggyStreamConfigTest.java
b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/config/IggyStreamConfigTest.java
new file mode 100644
index 000000000..43f0ac7d2
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/config/IggyStreamConfigTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.config;
+
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class IggyStreamConfigTest {
+
+ @Test
+ void testValidConfiguration() {
+ Map<String, String> props = createValidConfig();
+ StreamConfig streamConfig = new StreamConfig("test_table_REALTIME",
props);
+ IggyStreamConfig config = new IggyStreamConfig(streamConfig);
+
+ assertEquals("localhost", config.getHost());
+ assertEquals(8090, config.getPort());
+ assertEquals("iggy", config.getUsername());
+ assertEquals("iggy", config.getPassword());
+ assertEquals("analytics", config.getStreamId());
+ assertEquals("events", config.getTopicId());
+ assertEquals("test-consumer-group", config.getConsumerGroup());
+ assertEquals(100, config.getPollBatchSize());
+ assertEquals(4, config.getConnectionPoolSize());
+ assertFalse(config.isEnableTls());
+ }
+
+ @Test
+ void testCustomConfiguration() {
+ Map<String, String> props = createValidConfig();
+ props.put("stream.iggy.port", "9090");
+ props.put("stream.iggy.username", "custom-user");
+ props.put("stream.iggy.password", "custom-pass");
+ props.put("stream.iggy.poll.batch.size", "500");
+ props.put("stream.iggy.connection.pool.size", "8");
+ props.put("stream.iggy.enable.tls", "true");
+
+ StreamConfig streamConfig = new StreamConfig("test_table_REALTIME",
props);
+ IggyStreamConfig config = new IggyStreamConfig(streamConfig);
+
+ assertEquals(9090, config.getPort());
+ assertEquals("custom-user", config.getUsername());
+ assertEquals("custom-pass", config.getPassword());
+ assertEquals(500, config.getPollBatchSize());
+ assertEquals(8, config.getConnectionPoolSize());
+ assertTrue(config.isEnableTls());
+ }
+
+ @Test
+ void testMissingHostThrowsException() {
+ Map<String, String> props = createValidConfig();
+ props.remove("stream.iggy.host");
+
+ StreamConfig streamConfig = new StreamConfig("test_table_REALTIME",
props);
+
+ IllegalArgumentException exception =
+ assertThrows(IllegalArgumentException.class, () -> new
IggyStreamConfig(streamConfig));
+
+ assertTrue(exception.getMessage().contains("host"));
+ }
+
+ @Test
+ void testMissingStreamIdThrowsException() {
+ Map<String, String> props = createValidConfig();
+ props.remove("stream.iggy.stream.id");
+
+ StreamConfig streamConfig = new StreamConfig("test_table_REALTIME",
props);
+
+ IllegalArgumentException exception =
+ assertThrows(IllegalArgumentException.class, () -> new
IggyStreamConfig(streamConfig));
+
+ assertTrue(exception.getMessage().contains("stream ID"));
+ }
+
+ @Test
+ void testMissingTopicIdThrowsException() {
+ Map<String, String> props = createValidConfig();
+ props.remove("stream.iggy.topic.id");
+
+ StreamConfig streamConfig = new StreamConfig("test_table_REALTIME",
props);
+
+ IllegalArgumentException exception =
+ assertThrows(IllegalArgumentException.class, () -> new
IggyStreamConfig(streamConfig));
+
+ assertTrue(exception.getMessage().contains("topic ID"));
+ }
+
+ @Test
+ void testMissingConsumerGroupThrowsException() {
+ Map<String, String> props = createValidConfig();
+ props.remove("stream.iggy.consumer.group");
+
+ StreamConfig streamConfig = new StreamConfig("test_table_REALTIME",
props);
+
+ IllegalArgumentException exception =
+ assertThrows(IllegalArgumentException.class, () -> new
IggyStreamConfig(streamConfig));
+
+ assertTrue(exception.getMessage().contains("consumer group"));
+ }
+
+ @Test
+ void testServerAddress() {
+ Map<String, String> props = createValidConfig();
+ StreamConfig streamConfig = new StreamConfig("test_table_REALTIME",
props);
+ IggyStreamConfig config = new IggyStreamConfig(streamConfig);
+
+ assertEquals("localhost:8090", config.getServerAddress());
+ }
+
+ @Test
+ void testTableNameWithType() {
+ Map<String, String> props = createValidConfig();
+ StreamConfig streamConfig = new StreamConfig("events_REALTIME", props);
+ IggyStreamConfig config = new IggyStreamConfig(streamConfig);
+
+ assertEquals("events_REALTIME", config.getTableNameWithType());
+ }
+
+ @Test
+ void testToString() {
+ Map<String, String> props = createValidConfig();
+ StreamConfig streamConfig = new StreamConfig("test_table_REALTIME",
props);
+ IggyStreamConfig config = new IggyStreamConfig(streamConfig);
+
+ String str = config.toString();
+ assertTrue(str.contains("localhost"));
+ assertTrue(str.contains("8090"));
+ assertTrue(str.contains("analytics"));
+ assertTrue(str.contains("events"));
+ assertTrue(str.contains("test-consumer-group"));
+ }
+
+ @Test
+ void testNumericStreamAndTopicIds() {
+ Map<String, String> props = createValidConfig();
+ props.put("stream.iggy.stream.id", "123");
+ props.put("stream.iggy.topic.id", "456");
+
+ StreamConfig streamConfig = new StreamConfig("test_table_REALTIME",
props);
+ IggyStreamConfig config = new IggyStreamConfig(streamConfig);
+
+ assertEquals("123", config.getStreamId());
+ assertEquals("456", config.getTopicId());
+ }
+
+ private Map<String, String> createValidConfig() {
+ Map<String, String> props = new HashMap<>();
+ props.put("streamType", "iggy"); // Required by Pinot StreamConfig
+ props.put("stream.iggy.topic.name", "events"); // Required by Pinot
StreamConfig
+ props.put("stream.iggy.consumer.type", "lowlevel"); // Required by
Pinot
+ props.put(
+ "stream.iggy.consumer.factory.class.name",
+
"org.apache.iggy.connector.pinot.consumer.IggyConsumerFactory");
+ props.put("stream.iggy.decoder.class.name",
"org.apache.iggy.connector.pinot.decoder.IggyJsonMessageDecoder");
+
+ props.put("stream.iggy.host", "localhost");
+ props.put("stream.iggy.port", "8090");
+ props.put("stream.iggy.username", "iggy");
+ props.put("stream.iggy.password", "iggy");
+ props.put("stream.iggy.stream.id", "analytics");
+ props.put("stream.iggy.topic.id", "events");
+ props.put("stream.iggy.consumer.group", "test-consumer-group");
+ props.put("stream.iggy.poll.batch.size", "100");
+ props.put("stream.iggy.connection.pool.size", "4");
+ props.put("stream.iggy.enable.tls", "false");
+ return props;
+ }
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatchTest.java
b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatchTest.java
new file mode 100644
index 000000000..aca0be844
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyMessageBatchTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class IggyMessageBatchTest {
+
+ @Test
+ void testEmptyBatch() {
+ IggyMessageBatch batch = new IggyMessageBatch(new ArrayList<>());
+ assertEquals(0, batch.getMessageCount());
+ }
+
+ @Test
+ void testSingleMessage() {
+ List<IggyMessageBatch.IggyMessageAndOffset> messages = new
ArrayList<>();
+ byte[] payload = "test message".getBytes(StandardCharsets.UTF_8);
+ IggyStreamPartitionMsgOffset offset = new
IggyStreamPartitionMsgOffset(100L);
+ messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload,
offset));
+
+ IggyMessageBatch batch = new IggyMessageBatch(messages);
+
+ assertEquals(1, batch.getMessageCount());
+ assertArrayEquals(payload, batch.getMessageAtIndex(0));
+ assertEquals(payload.length, batch.getMessageLengthAtIndex(0));
+ assertEquals(100L, batch.getNextStreamMessageOffsetAtIndex(0));
+ assertEquals(offset, batch.getNextStreamPartitionMsgOffsetAtIndex(0));
+ }
+
+ @Test
+ void testMultipleMessages() {
+ List<IggyMessageBatch.IggyMessageAndOffset> messages = new
ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ byte[] payload = ("message-" + i).getBytes(StandardCharsets.UTF_8);
+ IggyStreamPartitionMsgOffset offset = new
IggyStreamPartitionMsgOffset(i * 100L);
+ messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload,
offset));
+ }
+
+ IggyMessageBatch batch = new IggyMessageBatch(messages);
+
+ assertEquals(10, batch.getMessageCount());
+
+ for (int i = 0; i < 10; i++) {
+ byte[] expectedPayload = ("message-" +
i).getBytes(StandardCharsets.UTF_8);
+ assertArrayEquals(expectedPayload, batch.getMessageAtIndex(i));
+ assertEquals(expectedPayload.length,
batch.getMessageLengthAtIndex(i));
+ assertEquals(i * 100L, batch.getNextStreamMessageOffsetAtIndex(i));
+ assertEquals(i, batch.getMessageOffsetAtIndex(i));
+ }
+ }
+
+ @Test
+ void testMessageAndOffsetWrapper() {
+ byte[] payload = "test".getBytes(StandardCharsets.UTF_8);
+ IggyStreamPartitionMsgOffset offset = new
IggyStreamPartitionMsgOffset(123L);
+
+ IggyMessageBatch.IggyMessageAndOffset wrapper = new
IggyMessageBatch.IggyMessageAndOffset(payload, offset);
+
+ assertArrayEquals(payload, wrapper.getMessage());
+ assertEquals(offset, wrapper.getOffset());
+ assertEquals(123L, wrapper.getOffset().getOffset());
+ }
+
+ @Test
+ void testNullOffsetAtInvalidIndex() {
+ List<IggyMessageBatch.IggyMessageAndOffset> messages = new
ArrayList<>();
+ byte[] payload = "test".getBytes(StandardCharsets.UTF_8);
+ IggyStreamPartitionMsgOffset offset = new
IggyStreamPartitionMsgOffset(100L);
+ messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload,
offset));
+
+ IggyMessageBatch batch = new IggyMessageBatch(messages);
+
+ assertNull(batch.getNextStreamPartitionMsgOffsetAtIndex(-1));
+ assertNull(batch.getNextStreamPartitionMsgOffsetAtIndex(10));
+ assertEquals(0, batch.getNextStreamMessageOffsetAtIndex(-1));
+ assertEquals(0, batch.getNextStreamMessageOffsetAtIndex(10));
+ }
+
+ @Test
+ void testLargeMessageBatch() {
+ List<IggyMessageBatch.IggyMessageAndOffset> messages = new
ArrayList<>();
+
+ // Create 1000 messages
+ for (int i = 0; i < 1000; i++) {
+ byte[] payload = new byte[1024]; // 1KB per message
+ IggyStreamPartitionMsgOffset offset = new
IggyStreamPartitionMsgOffset(i);
+ messages.add(new IggyMessageBatch.IggyMessageAndOffset(payload,
offset));
+ }
+
+ IggyMessageBatch batch = new IggyMessageBatch(messages);
+
+ assertEquals(1000, batch.getMessageCount());
+ assertEquals(1024, batch.getMessageLengthAtIndex(0));
+ assertEquals(1024, batch.getMessageLengthAtIndex(999));
+ }
+}
diff --git
a/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffsetTest.java
b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffsetTest.java
new file mode 100644
index 000000000..d3841b784
--- /dev/null
+++
b/foreign/java/external-processors/iggy-connector-pinot/src/test/java/org/apache/iggy/connector/pinot/consumer/IggyStreamPartitionMsgOffsetTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.connector.pinot.consumer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class IggyStreamPartitionMsgOffsetTest {
+
+ @Test
+ void testOffsetCreation() {
+ IggyStreamPartitionMsgOffset offset = new
IggyStreamPartitionMsgOffset(100L);
+ assertEquals(100L, offset.getOffset());
+ }
+
+ @Test
+ void testCompareTo() {
+ IggyStreamPartitionMsgOffset offset1 = new
IggyStreamPartitionMsgOffset(100L);
+ IggyStreamPartitionMsgOffset offset2 = new
IggyStreamPartitionMsgOffset(200L);
+ IggyStreamPartitionMsgOffset offset3 = new
IggyStreamPartitionMsgOffset(100L);
+
+ assertTrue(offset1.compareTo(offset2) < 0);
+ assertTrue(offset2.compareTo(offset1) > 0);
+ assertEquals(0, offset1.compareTo(offset3));
+ }
+
+ @Test
+ void testEquals() {
+ IggyStreamPartitionMsgOffset offset1 = new
IggyStreamPartitionMsgOffset(100L);
+ IggyStreamPartitionMsgOffset offset2 = new
IggyStreamPartitionMsgOffset(100L);
+ IggyStreamPartitionMsgOffset offset3 = new
IggyStreamPartitionMsgOffset(200L);
+
+ assertEquals(offset1, offset2);
+ assertNotEquals(offset1, offset3);
+ assertEquals(offset1, offset1);
+ assertNotEquals(offset1, null);
+ assertNotEquals(offset1, "string");
+ }
+
+ @Test
+ void testHashCode() {
+ IggyStreamPartitionMsgOffset offset1 = new
IggyStreamPartitionMsgOffset(100L);
+ IggyStreamPartitionMsgOffset offset2 = new
IggyStreamPartitionMsgOffset(100L);
+ IggyStreamPartitionMsgOffset offset3 = new
IggyStreamPartitionMsgOffset(200L);
+
+ assertEquals(offset1.hashCode(), offset2.hashCode());
+ assertNotEquals(offset1.hashCode(), offset3.hashCode());
+ }
+
+ @Test
+ void testToString() {
+ IggyStreamPartitionMsgOffset offset = new
IggyStreamPartitionMsgOffset(12345L);
+ assertEquals("12345", offset.toString());
+ }
+
+ @Test
+ void testZeroOffset() {
+ IggyStreamPartitionMsgOffset offset = new
IggyStreamPartitionMsgOffset(0L);
+ assertEquals(0L, offset.getOffset());
+ assertEquals("0", offset.toString());
+ }
+
+ @Test
+ void testLargeOffset() {
+ long largeOffset = Long.MAX_VALUE - 1;
+ IggyStreamPartitionMsgOffset offset = new
IggyStreamPartitionMsgOffset(largeOffset);
+ assertEquals(largeOffset, offset.getOffset());
+ assertEquals(String.valueOf(largeOffset), offset.toString());
+ }
+}
diff --git a/foreign/java/gradle/libs.versions.toml
b/foreign/java/gradle/libs.versions.toml
index 63cd5b3bd..dd3e8408c 100644
--- a/foreign/java/gradle/libs.versions.toml
+++ b/foreign/java/gradle/libs.versions.toml
@@ -19,8 +19,12 @@
# Flink
flink = "2.1.1"
+# Pinot
+pinot = "1.4.0"
+
# Jackson
jackson = "3.0.2"
+jackson2 = "2.18.2"
# Apache Commons
commons-lang3 = "3.20.0"
@@ -58,6 +62,10 @@ checkstyle = "12.1.2"
[libraries]
# Jackson
jackson-databind = { module = "tools.jackson.core:jackson-databind",
version.ref = "jackson" }
+jackson2-databind = { module = "com.fasterxml.jackson.core:jackson-databind",
version.ref = "jackson2" }
+
+# Pinot
+pinot-spi = { module = "org.apache.pinot:pinot-spi", version.ref = "pinot" }
# Apache HTTP Client
httpclient5 = { module = "org.apache.httpcomponents.client5:httpclient5",
version.ref = "httpclient5" }
diff --git a/foreign/java/settings.gradle.kts b/foreign/java/settings.gradle.kts
index 050119ed8..477beb85d 100644
--- a/foreign/java/settings.gradle.kts
+++ b/foreign/java/settings.gradle.kts
@@ -28,3 +28,6 @@ project(":iggy-connector-library").projectDir =
file("external-processors/iggy-c
include("iggy-flink-examples")
project(":iggy-flink-examples").projectDir =
file("external-processors/iggy-connector-flink/iggy-flink-examples")
+
+include("iggy-connector-pinot")
+project(":iggy-connector-pinot").projectDir =
file("external-processors/iggy-connector-pinot")