This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iggy-website.git
The following commit(s) were added to refs/heads/main by this push:
new 38f8e633 Improve connectors docs
38f8e633 is described below
commit 38f8e6332a1c0689fb4c167dcbc4d5a2226d9b8d
Author: spetz <[email protected]>
AuthorDate: Wed Dec 10 22:14:12 2025 +0100
Improve connectors docs
---
docs/connectors/introduction.md | 29 +++-
docs/connectors/runtime.md | 98 ++++++++++-
docs/connectors/sdk.md | 42 +++--
docs/connectors/sinks/_category_.json | 4 +
docs/connectors/sinks/elasticsearch.md | 50 ++++++
docs/connectors/sinks/iceberg.md | 87 ++++++++++
docs/connectors/sinks/postgres.md | 193 +++++++++++++++++++++
docs/connectors/sinks/quickwit.md | 67 ++++++++
docs/connectors/{ => sinks}/sink.md | 18 +-
docs/connectors/sources/_category_.json | 4 +
docs/connectors/sources/elasticsearch.md | 280 +++++++++++++++++++++++++++++++
docs/connectors/sources/postgres.md | 100 +++++++++++
docs/connectors/{ => sources}/source.md | 16 +-
docusaurus.config.ts | 1 +
14 files changed, 945 insertions(+), 44 deletions(-)
diff --git a/docs/connectors/introduction.md b/docs/connectors/introduction.md
index 4d89307c..6f8131d2 100644
--- a/docs/connectors/introduction.md
+++ b/docs/connectors/introduction.md
@@ -7,8 +7,6 @@ sidebar_position: 1
The highly performant and modular runtime for statically typed, yet
dynamically loaded connectors. Ingest the data from the external sources and
push it further to the Iggy streams, or fetch the data from the Iggy streams
and push it further to the external sources. Create your own Rust plugins by
simply implementing either the `Source` or `Sink` trait and build custom
pipelines for the data processing.
-**This is still WiP, and the runtime can be started only after compilation
from the source code (no installable package yet).**
-
The [docker image](https://hub.docker.com/r/apache/iggy-connect) is available,
and can be fetched via `docker pull apache/iggy-connect`.
## Features
@@ -20,7 +18,8 @@ The [docker
image](https://hub.docker.com/r/apache/iggy-connect) is available, a
- **Statically Typed**: Ensures type safety and compile-time checks, reducing
runtime errors.
- **Easy Customization**: Provides a simple interface for implementing custom
connectors, making it easy to create new plugins.
- **Data transformation**: Supports data transformation with the help of
existing functions.
-- **Powerful configuration**: Define your sinks, sources, and transformations
in the configuration file.
+- **Powerful configuration**: Define your sinks, sources, and transformations
in the configuration file or fetch them from a remote HTTP API.
+- **Flexible configuration providers**: Support for local file-based and
HTTP-based configuration providers for centralized configuration management.
## Quick Start
@@ -43,15 +42,29 @@ The [docker
image](https://hub.docker.com/r/apache/iggy-connect) is available, a
6. Start the connector runtime `cargo run --bin iggy-connectors -r` - you
should be able to browse Quickwit UI with records being constantly added to the
`events` index. At the same time, you should see the new messages being added
to the `example` stream and `topic1` topic by the test source connector - you
can use Iggy Web UI to browse the data. The messages will have applied the
basic fields transformations.
+## Configuration
+
+The same rules applies when it comes to overringing the configuration via
environment variables as for the main [Iggy server](docs/server/configuration).
+
+You can provide the following envs during the runtime startup:
+
+```
+IGGY_CONNECTORS_ENV_PATH - path to the .env file for custom environment
variables
+
+IGGY_CONNECTORS_CONFIG_PATH - path to the connectors runtime configuration file
+```
+
+Any configuration section can be overriden with `IGGY_CONNECTORS_` prefix,
followed by the section name and the key name, e.g.
`IGGY_CONNECTORS_IGGY_USERNAME`.
+
## Runtime
-All the connectors are implemented as Rust libraries and can be used as a part
of the connector runtime. The runtime is responsible for managing the lifecycle
of the connectors and providing the necessary infrastructure for the connectors
to run. For more information, please refer to the **[runtime
documentation](https://github.com/apache/iggy/tree/master/core/connectors/runtime)**.
+All the connectors are implemented as Rust libraries and can be used as a part
of the connector runtime. The runtime is responsible for managing the lifecycle
of the connectors and providing the necessary infrastructure for the connectors
to run. For more information, please refer to the **[runtime
documentation](/docs/connectors/runtime)**.
## Sink
Sinks are responsible for consuming the messages from the configured stream(s)
and topic(s) and sending them further to the specified destination. For
example, the Quickwit sink connector is responsible for sending the messages to
the Quickwit indexer.
-Please refer to the **[Sink
documentation](https://github.com/apache/iggy/tree/master/core/connectors/sinks)**
for the details about the configuration and the sample implementation.
+Please refer to the **[Sink documentation](/docs/connectors/sink)** for the
details about the configuration and the sample implementation.
When implementing `Sink`, make sure to use the `sink_connector!` macro to
expose the FFI interface and allow the connector runtime to register the sink
with the runtime.
Each sink should have its own, custom configuration, which is passed along
with the unique plugin ID via expected `new()` method.
@@ -60,14 +73,14 @@ Each sink should have its own, custom configuration, which
is passed along with
Sources are responsible for producing the messages to the configured stream(s)
and topic(s). For example, the Test source connector will generate the random
messages that will be then sent to the configured stream and topic.
-Please refer to the **[Source
documentation](https://github.com/apache/iggy/tree/master/core/connectors/sources)**
for the details about the configuration and the sample implementation.
+Please refer to the **[Source documentation](/docs/connectors/source)** for
the details about the configuration and the sample implementation.
## Building the connectors
-New connector can be built simply by implementing either `Sink` or `Source`
trait. Please check the
**[sink](https://github.com/apache/iggy/tree/master/core/connectors/sinks)** or
**[source](https://github.com/apache/iggy/tree/master/core/connectors/sources)**
documentation, as well as the existing examples under `/sinks` and `/sources`
directories.
+New connector can be built simply by implementing either `Sink` or `Source`
trait. Please check the **[sink](/docs/connectors/sink)** or
**[source](/docs/connectors/source)** documentation, as well as the existing
examples under `/sinks` and `/sources` directories.
## Transformations
Field transformations (depending on the supported payload formats) can be
applied to the messages either before they are sent to the specified topic
(e.g. when produced by the source connectors), or before consumed by the sink
connectors. To add the new transformation, simply implement the `Transform`
trait and extend the existing `load` function. Each transform may have its own,
custom configuration.
-To find out more about the transforms, stream decoders or encoders, please
refer to the **[SDK
documentation](https://github.com/apache/iggy/tree/master/core/connectors/sdk)**.
+To find out more about the transforms, stream decoders or encoders, please
refer to the **[SDK documentation](/docs/connectors/sdk)**.
diff --git a/docs/connectors/runtime.md b/docs/connectors/runtime.md
index 6bce1a5a..f292e52b 100644
--- a/docs/connectors/runtime.md
+++ b/docs/connectors/runtime.md
@@ -4,7 +4,6 @@ slug: /connectors/runtime
title: Runtime
sidebar_position: 2
---
-
Runtime is responsible for managing the lifecycle of the connectors and
providing the necessary infrastructure for the connectors to run.
The runtime uses a shared [Tokio runtime](https://tokio.rs) to manage the
asynchronous tasks and events across all connectors. Additionally, it has
built-in support for logging via
[tracing](https://docs.rs/tracing/latest/tracing/) crate.
@@ -19,7 +18,7 @@ To start the connector runtime, simply run `cargo run --bin
iggy-connectors`.
The [docker image](https://hub.docker.com/r/apache/iggy-connect) is available,
and can be fetched via `docker pull apache/iggy-connect`.
-The minimal viable configuration requires at least the Iggy credentials to
create 2 separate instances of producer & consumer connections, the state
directory path where source connectors can store their optional state, and the
`connectors.config_dir` setting that specifies the directory containing
individual connector configuration files.
+The minimal viable configuration requires at least the Iggy credentials to
create 2 separate instances of producer & consumer connections, the state
directory path where source connectors can store their optional state, and the
connectors configuration provider settings.
```toml
[iggy]
@@ -30,7 +29,7 @@ token = "" # Personal Access Token (PAT) can be used instead
of username and pas
[iggy.tls] # Optional TLS configuration for Iggy TCP connection
enabled = false
-ca_file = "core/certs/iggy_cert.pem"
+ca_file = "core/certs/iggy_ca_cert.pem"
domain = "" # Optional domain for TLS connection
[state]
@@ -41,10 +40,85 @@ config_type = "local"
config_dir = "path/to/connectors"
```
-Each connector (source or sink) is configured in its own separate file within
the directory specified by `connectors.config_dir`. If `config_dir` is empty or
the directory doesn't exist, no connectors will be loaded.
-
The path to the configuration can be overridden by
`IGGY_CONNECTORS_CONFIG_PATH` environment variable. Each configuration section
can be also additionally updated by using the following convention
`IGGY_CONNECTORS_SECTION_NAME.KEY_NAME` e.g. `IGGY_CONNECTORS_IGGY_USERNAME`
and so on.
+## Configuration Providers
+
+The runtime supports two types of configuration providers for managing
connector configurations:
+
+### Local File Provider
+
+The default configuration provider reads connector configurations from local
files. Each connector (source or sink) is configured in its own separate file
within the directory specified by `connectors.config_dir`. If `config_dir` is
empty or the directory doesn't exist, no connectors will be loaded.
+
+```toml
+[connectors]
+config_type = "local"
+config_dir = "path/to/connectors"
+```
+
+### HTTP Configuration Provider
+
+The HTTP configuration provider allows the runtime to fetch connector
configurations from a remote HTTP/REST API. This enables centralized
configuration management and dynamic configuration updates.
+
+```toml
+[connectors]
+config_type = "http"
+base_url = "http://localhost:8080/api"
+timeout = "10s"
+
+[connectors.request_headers]
+api-key = "your-api-key"
+
+[connectors.retry]
+enabled = true
+max_attempts = 3
+initial_backoff = "1 s"
+max_backoff = "30 s"
+backoff_multiplier = 2
+
+[connectors.url_templates]
+# Optional: Customize URL templates for specific operations
+# If not specified, default RESTful URL patterns are used
+create_sink = "/sinks/{key}/configs"
+create_source = "/sources/{key}/configs"
+get_active_configs = "/configs/active"
+
+[connectors.response]
+# Optional: Extract data from nested response structures
+data_path = "data" # Path to data in response (e.g., {"data": {...}})
+error_path = "error" # Path to error in response (e.g., {"error": "..."})
+```
+
+#### Configuration Options
+
+- **base_url** (required): Base URL of the configuration API endpoint
+- **timeout** (optional): HTTP request timeout (default: 10s)
+- **request_headers** (optional): Custom headers to include in all HTTP
requests (e.g., authentication headers)
+- **url_templates** (optional): Custom URL templates for API endpoints.
Supports variable substitution with `{key}` and `{version}` placeholders.
+- **response.data_path** (optional): JSON path to extract response data from
nested structures (e.g., "data.config")
+- **response.error_path** (optional): JSON path to check for errors in
responses
+
+#### Default URL Templates
+
+If not customized, the HTTP provider uses the following RESTful URL patterns:
+
+- Create sink config: `POST {base_url}/sinks/{key}/configs`
+- Create source config: `POST {base_url}/sources/{key}/configs`
+- Get active configs: `GET {base_url}/configs/active`
+- Get active versions: `GET {base_url}/configs/active/versions`
+- Set active sink version: `PUT {base_url}/sinks/{key}/configs/active`
+- Set active source version: `PUT {base_url}/sources/{key}/configs/active`
+- Get sink configs: `GET {base_url}/sinks/{key}/configs`
+- Get sink config by version: `GET {base_url}/sinks/{key}/configs/{version}`
+- Get active sink config: `GET {base_url}/sinks/{key}/configs/active`
+- Get source configs: `GET {base_url}/sources/{key}/configs`
+- Get source config by version: `GET
{base_url}/sources/{key}/configs/{version}`
+- Get active source config: `GET {base_url}/sources/{key}/configs/active`
+- Delete sink config: `DELETE {base_url}/sinks/{key}/configs`
+- Delete source config: `DELETE {base_url}/sources/{key}/configs`
+
+The HTTP provider expects the remote API to implement these endpoints and
return connector configuration data in the same format as used by the local
provider.
+
## HTTP API
Connector runtime has an optional HTTP API that can be enabled by setting the
`enabled` flag to `true` in the `[http]` section.
@@ -76,9 +150,19 @@ Currently, it does expose the following endpoints:
- `GET /health`: health status of the runtime.
- `GET /sinks`: list of sinks.
- `GET /sinks/{key}`: sink details.
-- `GET /sinks/{key}/config`: sink config, including the optional `format`
query parameter to specify the config format.
+- `GET /sinks/{key}/configs`: list of configuration versions for the sink.
+- `POST /sinks/{key}/configs`: add a new configuration version for the sink.
+- `GET /sinks/{key}/configs/{version}`: configuration details for a specific
version.
+- `GET /sinks/{key}/configs/active`: active configuration details.
+- `PUT /sinks/{key}/configs/active`: activate a specific configuration version
for the sink.
+- `GET /sinks/{key}/configs/plugin`: sink plugin config, including the
optional `format` query parameter to specify the config format.
- `GET /sinks/{key}/transforms`: sink transforms to be applied to the fields.
- `GET /sources`: list of sources.
- `GET /sources/{key}`: source details.
-- `GET /sources/{key}/config`: source config, including the optional `format`
query parameter to specify the config format.
+- `GET /sources/{key}/configs`: list of configuration versions for the source.
+- `POST /sources/{key}/configs`: add a new configuration version for the
source.
+- `GET /sources/{key}/configs/{version}`: configuration details for a specific
version.
+- `GET /sources/{key}/configs/active`: active configuration details.
+- `PUT /sources/{key}/configs/active`: activate a specific configuration
version for the source.
+- `GET /sources/{key}/configs/plugin`: source plugin config, including the
optional `format` query parameter to specify the config format.
- `GET /sources/{key}/transforms`: source transforms to be applied to the
fields.
diff --git a/docs/connectors/sdk.md b/docs/connectors/sdk.md
index fd894a69..186964c2 100644
--- a/docs/connectors/sdk.md
+++ b/docs/connectors/sdk.md
@@ -9,15 +9,15 @@ SDK provides the commonly used structs and traits such as
`Sink` and `Source`, a
Moreover, it contains both, the `decoders` and `encoders` modules,
implementing either `StreamDecoder` or `StreamEncoder` traits, which are used
when consuming or producing data from/to Iggy streams.
-SDK is WiP, and it'd certainly benefit from having the support of multiple
format schemas, such as Protobuf, Avro, Flatbuffers etc. including
decoding/encoding the data between the different formats (when applicable) and
supporting the data transformations whenever possible (easy for JSON, but
complex for Bincode for example).
+SDK is WiP, and it'd certainly benefit from having the support of multiple
format schemas, such as Avro, Flatbuffers etc. including decoding/encoding the
data between the different formats (when applicable) and supporting the data
transformations whenever possible (easy for JSON, but complex for Bincode for
example).
-Last but not least, the different `transforms` are available, to transform
(add, update, delete etc.) the particular fields of the data being processed
via external configuration. It's as simple as adding a new transform to the
`transforms` section of the particular connector configuration:
+Last but not least, the different `transforms` are available, to transform
(add, update, delete etc.) the particular fields of the data being processed
via external configuration. It's as simple as adding a new transform to the
`transforms` section of the particular connector configuration file:
```toml
-[sources.random.transforms.add_fields]
+[transforms.add_fields]
enabled = true
-[[sources.random.transforms.add_fields.fields]]
+[[transforms.add_fields.fields]]
key = "message"
value.static = "hello"
```
@@ -28,7 +28,9 @@ The SDK includes support for Protocol Buffers (protobuf)
format with both encodi
### Configuration Example
-Here's a complete example configuration for using Protocol Buffers with Iggy
connectors:
+Here's a complete example configuration for using Protocol Buffers with Iggy
connectors.
+
+**Main runtime config (config.toml):**
```toml
[iggy]
@@ -36,41 +38,57 @@ address = "localhost:8090"
username = "iggy"
password = "iggy"
-[sources.protobuf_source]
+[connectors]
+config_type = "local"
+config_dir = "path/to/connectors"
+```
+
+**Source connector config (connectors/protobuf_source.toml):**
+
+```toml
+type = "source"
+key = "protobuf"
enabled = true
+version = 0
name = "Protobuf Source"
path = "target/release/libiggy_connector_protobuf_source"
-[[sources.protobuf_source.streams]]
+[[streams]]
stream = "protobuf_stream"
topic = "protobuf_topic"
schema = "proto"
batch_size = 1000
send_interval = "5ms"
-[sources.protobuf_source.config]
+[plugin_config]
schema_path = "schemas/message.proto"
message_type = "com.example.Message"
use_any_wrapper = true
+```
-[sinks.protobuf_sink]
+**Sink connector config (connectors/protobuf_sink.toml):**
+
+```toml
+type = "sink"
+key = "protobuf"
enabled = true
+version = 0
name = "Protobuf Sink"
path = "target/release/libiggy_connector_protobuf_sink"
-[[sinks.protobuf_sink.streams]]
+[[streams]]
stream = "protobuf_stream"
topic = "protobuf_topic"
schema = "proto"
-[[sinks.protobuf_sink.transforms]]
+[[transforms]]
type = "proto_convert"
target_format = "json"
preserve_structure = true
field_mappings = { "old_field" = "new_field", "legacy_id" = "id" }
-[[sinks.protobuf_sink.transforms]]
+[[transforms]]
type = "proto_convert"
target_format = "proto"
preserve_structure = false
diff --git a/docs/connectors/sinks/_category_.json
b/docs/connectors/sinks/_category_.json
new file mode 100644
index 00000000..4fb349e7
--- /dev/null
+++ b/docs/connectors/sinks/_category_.json
@@ -0,0 +1,4 @@
+{
+ "label": "Sinks",
+ "position": 4
+}
diff --git a/docs/connectors/sinks/elasticsearch.md
b/docs/connectors/sinks/elasticsearch.md
new file mode 100644
index 00000000..d44f2909
--- /dev/null
+++ b/docs/connectors/sinks/elasticsearch.md
@@ -0,0 +1,50 @@
+---
+id: connectors-sinks-elasticsearch
+slug: /connectors/sinks/elasticsearch
+title: Elasticsearch Sink
+sidebar_position: 2
+---
+
+A sink connector that consumes messages from Iggy streams and indexes them to
Elasticsearch.
+
+## Configuration
+
+- `url`: Elasticsearch cluster URL
+- `index`: Target index name
+- `username/password`: Optional authentication credentials
+- `batch_size`: Bulk indexing batch size (default: 100)
+- `timeout_seconds`: Request timeout (default: 30s)
+- `create_index_if_not_exists`: Automatically create index (default: true)
+- `index_mapping`: Index mapping configuration
+
+```toml
+[plugin_config]
+url = "http://localhost:9200"
+index = "events"
+username = "elastic"
+password = "changeme"
+batch_size = 100
+timeout_seconds = 30
+create_index_if_not_exists = true
+index_mapping = """
+{
+ "mappings": {
+ "properties": {
+ "timestamp": { "type": "date" },
+ "message": { "type": "text" },
+ "service_name": { "type": "keyword" },
+ "user_id": { "type": "keyword" },
+ "level": { "type": "keyword" }
+ }
+ }
+}
+"""
+```
+
+## Features
+
+- Bulk indexing optimization
+- Automatic index creation
+- Error handling and retry mechanisms
+- Metadata field injection
+- Support for multiple data formats
diff --git a/docs/connectors/sinks/iceberg.md b/docs/connectors/sinks/iceberg.md
new file mode 100644
index 00000000..70307e5b
--- /dev/null
+++ b/docs/connectors/sinks/iceberg.md
@@ -0,0 +1,87 @@
+---
+id: connectors-sinks-iceberg
+slug: /connectors/sinks/iceberg
+title: Iceberg Sink
+sidebar_position: 3
+---
+
+The Iceberg Sink Connector allows you to consume messages from Iggy topics and
store them in Iceberg tables.
+
+## Features
+
+- **Support for S3-compatible storage**
+- **Support for REST catalogs**
+- **Single destination table**
+- **Multiple-table fan-out static routing**
+- **Multiple-table fan-out dynamic routing**
+
+## Configuration example
+
+```toml
+[plugin_config]
+tables = ["nyc.users"]
+catalog_type = "rest"
+warehouse = "warehouse"
+uri = "http://localhost:8181"
+dynamic_routing = true
+dynamic_route_field = "db_table"
+store_url = "http://localhost:9000"
+store_access_key_id = "admin"
+store_secret_access_key = "password"
+store_region = "us-east-1"
+store_class = "s3"
+```
+
+## Configuration Options
+
+- **tables**: The names of the Iceberg tables you want to statically route
Iggy messages to. The name should include the table’s namespace, separated by a
dot (`.`).
+- **catalog_type**: The type of catalog you are routing data to. **Currently,
only REST catalogs are fully supported.**
+- **warehouse**: The name of the bucket or warehouse where Iggy will upload
data files.
+- **URI**: The URI of the Iceberg catalog.
+- **dynamic_routing**: Enables dynamic routing. See more details later in this
document.
+- **dynamic_route_field**: The name of the message field that specifies the
Iceberg table to route data to. See more details below.
+- **store_url**: The URL of the object storage for data uploads.
+- **store_access_key_id**: The access key ID of the object storage.
+- **store_secret_access_key**: The secret key used to upload data to the
object storage.
+- **store_region**: The region of the object storage, if applicable.
+- **store_class**: The storage class to use. **Currently, only S3-compatible
storage is supported.**
+
+## Dynamic Routing
+
+If you don't know the names of the Iceberg tables you want to route data to in
advance, you can use the dynamic routing feature.
+Insert a field in your Iggy messages with the name of the Iceberg table the
message should be routed to. The Iggy connector will parse this field at
runtime and route the message to the correct table.
+
+The Iggy Iceberg Connector will skip messages in the following cases:
+
+- The table declared in the message field does not exist.
+- The message does not contain the field specified in the
`dynamic_route_field` configuration option.
+
+### Dynamic routing configuration example
+
+```toml
+[plugin_config]
+tables = [""]
+catalog_type = "rest"
+warehouse = "warehouse"
+uri = "http://localhost:8181"
+dynamic_routing = true
+dynamic_route_field = "db_table"
+store_url = "http://localhost:9000"
+store_access_key_id = "admin"
+store_secret_access_key = "password"
+store_region = "us-east-1"
+store_class = "s3"
+
+[sinks.iceberg.transforms.add_fields]
+enabled = true
+
+[[sinks.iceberg.transforms.add_fields.fields]]
+key = "db_table"
+value.static = "nyc.users"
+```
+
+**Note:** The value in the message field **must** contain both the namespace
and the table name, separated by a dot (`.`).
+Example:
+
+- Namespace: `nyc`
+- Table name: `users`
diff --git a/docs/connectors/sinks/postgres.md
b/docs/connectors/sinks/postgres.md
new file mode 100644
index 00000000..9f56f530
--- /dev/null
+++ b/docs/connectors/sinks/postgres.md
@@ -0,0 +1,193 @@
+---
+id: connectors-sinks-postgres
+slug: /connectors/sinks/postgres
+title: Postgres Sink
+sidebar_position: 4
+---
+
+The PostgreSQL sink connector allows you to consume messages from Iggy topics
and store them in PostgreSQL databases.
+
+## Features
+
+- **Automatic Table Creation**: Optionally create tables automatically
+- **Batch Processing**: Insert messages in configurable batches for performance
+- **Metadata Storage**: Store Iggy message metadata (offset, timestamp, topic,
etc.)
+- **Raw Payload Storage**: Store original message payload as raw bytes
+- **Flexible Data Handling**: Works with any payload type (JSON, text, binary,
protobuf, etc.)
+- **Connection Pooling**: Efficient database connection management
+
+## Configuration
+
+```json
+{
+ "connection_string":
"postgresql://username:password@localhost:5432/database",
+ "target_table": "iggy_messages",
+ "batch_size": 100,
+ "max_connections": 10,
+ "auto_create_table": true,
+ "include_metadata": true,
+ "include_checksum": true,
+ "include_origin_timestamp": true
+}
+```
+
+### Configuration Options
+
+- `connection_string`: PostgreSQL connection string
+- `target_table`: Name of the table to insert messages into
+- `batch_size`: Number of messages to insert in each batch (default: 100)
+- `max_connections`: Maximum database connections (default: 10)
+- `auto_create_table`: Automatically create the target table if it doesn't
exist (default: false)
+- `include_metadata`: Include Iggy metadata columns (default: true)
+- `include_checksum`: Include message checksum (default: true)
+- `include_origin_timestamp`: Include original message timestamp (default:
true)
+
+## Table Schema
+
+When `auto_create_table` is enabled, the following table structure is created:
+
+```sql
+CREATE TABLE iggy_messages (
+ id DECIMAL(39, 0) PRIMARY KEY,
+ iggy_offset BIGINT,
+ iggy_timestamp TIMESTAMP WITH TIME ZONE,
+ iggy_stream TEXT,
+ iggy_topic TEXT,
+ iggy_partition_id INTEGER,
+ iggy_checksum BIGINT,
+ iggy_origin_timestamp TIMESTAMP WITH TIME ZONE,
+ payload BYTEA,
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
+);
+```
+
+## Data Storage
+
+The payload is stored as raw bytes in the `payload` column, regardless of the
original format:
+
+### JSON Messages
+
+JSON payloads are stored as UTF-8 encoded bytes. You can query them using
PostgreSQL's JSON functions:
+
+```sql
+SELECT id, payload::text::jsonb->>'user_id' as user_id
+FROM iggy_messages
+WHERE payload::text::jsonb->>'user_id' IS NOT NULL;
+```
+
+### Text Messages
+
+Text payloads are stored as UTF-8 encoded bytes:
+
+```sql
+SELECT id, convert_from(payload, 'UTF8') as message_text
+FROM iggy_messages;
+```
+
+### Binary Messages
+
+Binary data is stored directly as bytes:
+
+```sql
+SELECT id, encode(payload, 'base64') as payload_base64
+FROM iggy_messages;
+```
+
+### Protocol Buffer Messages
+
+Protobuf messages are stored as raw bytes and can be processed by your
application:
+
+```sql
+SELECT id, payload, length(payload) as payload_size
+FROM iggy_messages;
+```
+
+## Usage Example
+
+1. Configure the sink connector in your Iggy connectors runtime
+2. Messages consumed from the specified topics will be inserted into PostgreSQL
+3. Query the data using standard SQL:
+
+```sql
+-- Get all messages from a specific stream
+SELECT * FROM iggy_messages WHERE iggy_stream = 'user_events';
+
+-- Get messages with their payload as text (for text/JSON payloads)
+SELECT id, iggy_offset, convert_from(payload, 'UTF8') as payload_text
+FROM iggy_messages
+WHERE iggy_stream = 'user_events';
+
+-- Get messages from a specific time range
+SELECT * FROM iggy_messages
+WHERE created_at >= '2024-01-01'
+AND created_at < '2024-02-01';
+
+-- Get payload size statistics
+SELECT
+ iggy_stream,
+ iggy_topic,
+ COUNT(*) as message_count,
+ AVG(length(payload)) as avg_payload_size,
+ MAX(length(payload)) as max_payload_size
+FROM iggy_messages
+GROUP BY iggy_stream, iggy_topic;
+```
+
+## Performance Considerations
+
+- Use appropriate `batch_size` for your workload (larger batches = better
throughput)
+- Consider creating indexes on frequently queried columns
+- Monitor connection pool usage with `max_connections`
+- Create indexes for efficient queries:
+
+```sql
+CREATE INDEX idx_iggy_messages_stream ON iggy_messages (iggy_stream);
+CREATE INDEX idx_iggy_messages_topic ON iggy_messages (iggy_topic);
+CREATE INDEX idx_iggy_messages_created_at ON iggy_messages (created_at);
+CREATE INDEX idx_iggy_messages_offset ON iggy_messages (iggy_offset);
+```
+
+## Working with Different Payload Types
+
+### JSON Payloads
+
+For JSON data, you can use PostgreSQL's JSON operators:
+
+```sql
+-- Extract JSON fields (assuming payload is JSON)
+SELECT
+ id,
+ payload::text::jsonb->>'name' as name,
+ payload::text::jsonb->>'email' as email
+FROM iggy_messages
+WHERE payload::text::jsonb->>'name' IS NOT NULL;
+
+-- Create a GIN index for faster JSON queries
+CREATE INDEX idx_iggy_messages_payload_gin ON iggy_messages USING GIN
((payload::text::jsonb));
+```
+
+### Text Payloads
+
+For text data, convert bytes to text:
+
+```sql
+-- Full-text search on text payloads
+SELECT id, convert_from(payload, 'UTF8') as message
+FROM iggy_messages
+WHERE convert_from(payload, 'UTF8') LIKE '%error%';
+```
+
+### Binary Payloads
+
+For binary data, work with raw bytes or encode as needed:
+
+```sql
+-- Get binary payload as hex
+SELECT id, encode(payload, 'hex') as payload_hex
+FROM iggy_messages;
+
+-- Get payload size
+SELECT id, length(payload) as payload_size
+FROM iggy_messages
+ORDER BY payload_size DESC;
+```
diff --git a/docs/connectors/sinks/quickwit.md
b/docs/connectors/sinks/quickwit.md
new file mode 100644
index 00000000..a97d64e7
--- /dev/null
+++ b/docs/connectors/sinks/quickwit.md
@@ -0,0 +1,67 @@
+---
+id: connectors-sinks-quickwit
+slug: /connectors/sinks/quickwit
+title: Quickwit Sink
+sidebar_position: 5
+---
+
+The Quickwit connector allows you to send data to the Quickwit API using HTTP.
This sink will ensure that the index exists (create it if it doesn't) and will
append the data to the index using the same batch size as specified in the Iggy
configuration.
+
+## Configuration
+
+- `url`: The URL of the Quickwit server.
+- `index`: The index configuration using YAML, as described in the [Quickwit
index configuration docs](https://quickwit.io/docs/configuration/index-config)
+
+```toml
+[plugin_config]
+url = "http://localhost:7280"
+index = """
+version: 0.9
+
+index_id: events
+
+doc_mapping:
+ mode: strict
+ field_mappings:
+ - name: timestamp
+ type: datetime
+ input_formats: [unix_timestamp]
+ output_format: unix_timestamp_nanos
+ indexed: false
+ fast: true
+ fast_precision: milliseconds
+ - name: service_name
+ type: text
+ tokenizer: raw
+ fast: true
+ - name: random_id
+ type: text
+ tokenizer: raw
+ fast: true
+ - name: user_id
+ type: text
+ tokenizer: raw
+ fast: true
+ - name: user_type
+ type: u64
+ fast: true
+ - name: source
+ type: text
+ tokenizer: default
+ - name: state
+ type: text
+ tokenizer: default
+ - name: message
+ type: text
+ tokenizer: default
+
+ timestamp_field: timestamp
+
+indexing_settings:
+ commit_timeout_secs: 10
+
+retention:
+ period: 7 days
+ schedule: daily
+"""
+```
diff --git a/docs/connectors/sink.md b/docs/connectors/sinks/sink.md
similarity index 95%
rename from docs/connectors/sink.md
rename to docs/connectors/sinks/sink.md
index 39efddd1..ba8130c7 100644
--- a/docs/connectors/sink.md
+++ b/docs/connectors/sinks/sink.md
@@ -2,7 +2,7 @@
id: connectors-sink
slug: /connectors/sink
title: Sink
-sidebar_position: 4
+sidebar_position: 1
---
## Overview
@@ -36,15 +36,15 @@ Each sink connector is configured in its own separate
configuration file within
```rust
pub struct SinkConfig {
- pub id: String,
+ pub key: String,
pub enabled: bool,
pub version: u64,
pub name: String,
pub path: String,
pub transforms: Option<TransformsConfig>,
pub streams: Vec<StreamConsumerConfig>,
- pub config_format: Option<ConfigFormat>,
- pub config: Option<serde_json::Value>,
+ pub plugin_config_format: Option<ConfigFormat>,
+ pub plugin_config: Option<serde_json::Value>,
}
```
@@ -61,14 +61,14 @@ config_dir = "path/to/connectors"
```toml
# Type of connector (sink or source)
type = "sink"
-id = "stdout" # Unique sink ID
+key = "stdout" # Unique sink ID
# Required configuration for a sink connector
enabled = true
version = 0
name = "Stdout sink"
path = "target/release/libiggy_connector_stdout_sink"
-config_format = "toml"
+plugin_config_format = "toml"
# Collection of the streams from which messages are consumed
[[streams]]
@@ -79,8 +79,8 @@ batch_length = 100
poll_interval = "5ms"
consumer_group = "stdout_sink_connector"
-# Custom configuration for the sink connector, deserialized to type T from
`config` field
-[config]
+# Custom configuration for the sink connector, deserialized to type T from
`plugin_config` field
+[plugin_config]
print_payload = true
# Optional data transformation(s) to be applied after consuming messages from
the stream
@@ -95,7 +95,7 @@ value.static = "hello"
### Environment Variable Overrides
-Configuration properties can be overridden using environment variables. The
pattern follows: `IGGY_CONNECTORS_SINK_[ID]_[PROPERTY]`
+Configuration properties can be overridden using environment variables. The
pattern follows: `IGGY_CONNECTORS_SINK_[KEY]_[PROPERTY]`
For example, to override the `enabled` property for a sink with ID `stdout`:
diff --git a/docs/connectors/sources/_category_.json
b/docs/connectors/sources/_category_.json
new file mode 100644
index 00000000..58e9f240
--- /dev/null
+++ b/docs/connectors/sources/_category_.json
@@ -0,0 +1,4 @@
+{
+ "label": "Sources",
+ "position": 5
+}
diff --git a/docs/connectors/sources/elasticsearch.md
b/docs/connectors/sources/elasticsearch.md
new file mode 100644
index 00000000..760bebed
--- /dev/null
+++ b/docs/connectors/sources/elasticsearch.md
@@ -0,0 +1,280 @@
+---
+id: connectors-sources-elasticsearch
+slug: /connectors/sources/elasticsearch
+title: Elasticsearch Source
+sidebar_position: 2
+---
+
+This Elasticsearch source connector provides comprehensive state management
capabilities to track processing progress and enable fault-tolerant data
ingestion.
+
+## Features
+
+- **Incremental Data Processing**: Track last processed timestamp to avoid
reprocessing data
+- **Cursor-based Pagination**: Support for document ID-based cursors
+- **Scroll-based Pagination**: Support for Elasticsearch scroll API
+- **Error Tracking**: Monitor error counts and last error messages
+- **Processing Statistics**: Track performance metrics and processing times
+- **Persistent State Storage**: Multiple storage backends (file,
Elasticsearch, Redis)
+- **Auto-save**: Configurable automatic state persistence
+- **State Recovery**: Resume processing from last known position after restart
+
+## Configuration
+
+### Basic Configuration
+
+```toml
+type = "source"
+key = "elasticsearch"
+enabled = true
+version = 0
+name = "Elasticsearch source"
+path = "target/release/libiggy_connector_elasticsearch_source"
+
+[[streams]]
+stream = "elasticsearch_stream"
+topic = "documents"
+schema = "json"
+batch_length = 100
+linger_time = "5ms"
+
+[plugin_config]
+url = "http://localhost:9200"
+index = "logs-*"
+polling_interval = "30s"
+batch_size = 100
+timestamp_field = "@timestamp"
+query = {
+ "match_all": {}
+}
+```
+
+### State Management Configuration
+
+```toml
+[plugin_config]
+# ... basic config ...
+state = {
+ enabled = true
+ storage_type = "file" # "file", "elasticsearch", "redis"
+ storage_config = {
+ base_path = "./connector_states" # for file storage
+ # index = "connector_states" # for elasticsearch storage
+ # url = "redis://localhost:6379" # for redis storage
+ }
+ state_id = "elasticsearch_logs_connector"
+ auto_save_interval = "5m"
+ tracked_fields = [
+ "last_poll_timestamp",
+ "last_document_id",
+ "total_documents_fetched"
+ ]
+}
+```
+
+## State Information
+
+The connector tracks the following state information:
+
+### Processing State
+
+- `last_poll_timestamp`: Last successful poll timestamp
+- `total_documents_fetched`: Total number of documents processed
+- `poll_count`: Number of polling cycles executed
+- `last_document_id`: Last processed document ID (for cursor pagination)
+- `last_scroll_id`: Last scroll ID (for scroll pagination)
+- `last_offset`: Last processed offset
+
+### Error Tracking
+
+- `error_count`: Total number of errors encountered
+- `last_error`: Last error message
+
+### Performance Statistics
+
+- `total_bytes_processed`: Total bytes processed
+- `avg_batch_processing_time_ms`: Average processing time per batch
+- `last_successful_poll`: Timestamp of last successful poll
+- `empty_polls_count`: Number of polls that returned no documents
+- `successful_polls_count`: Number of successful polls
+
+## Storage Backends
+
+### File Storage (Default)
+
+```toml
+state = {
+ enabled = true
+ storage_type = "file"
+ storage_config = {
+ base_path = "./connector_states"
+ }
+}
+```
+
+### Elasticsearch Storage
+
+```toml
+state = {
+ enabled = true
+ storage_type = "elasticsearch"
+ storage_config = {
+ index = "connector_states"
+ url = "http://localhost:9200"
+ }
+}
+```
+
+### Redis Storage
+
+```toml
+state = {
+ enabled = true
+ storage_type = "redis"
+ storage_config = {
+ url = "redis://localhost:6379"
+ key_prefix = "connector_states:"
+ }
+}
+```
+
+## Usage Examples
+
+### Basic Usage with State Management
+
+```rust
+use elasticsearch_source::{ElasticsearchSource, StateManagerExt};
+
+// Create connector with state management enabled
+let mut connector = ElasticsearchSource::new(id, config);
+
+// Open connector (automatically loads state if available)
+connector.open().await?;
+
+// Start polling (automatically saves state)
+let messages = connector.poll().await?;
+
+// Close connector (automatically saves final state)
+connector.close().await?;
+```
+
+### Manual State Management
+
+```rust
+use elasticsearch_source::{ElasticsearchSource, StateManagerExt};
+
+let mut connector = ElasticsearchSource::new(id, config);
+
+// Load state manually
+connector.load_state().await?;
+
+// Get current state
+let state = connector.get_state().await?;
+println!("Current state: {:?}", state);
+
+// Export state to JSON
+let state_json = connector.export_state().await?;
+println!("State JSON: {}", serde_json::to_string_pretty(&state_json)?);
+
+// Import state from JSON
+connector.import_state(state_json).await?;
+
+// Reset state
+connector.reset_state().await?;
+```
+
+### State Manager Utilities
+
+```rust
+use elasticsearch_source::{ElasticsearchSource, StateManagerExt};
+
+let connector = ElasticsearchSource::new(id, config);
+
+// Get state manager
+if let Some(state_manager) = connector.get_state_manager() {
+ // Get state statistics
+ let stats = state_manager.get_state_stats().await?;
+ println!("Total states: {}", stats.total_states);
+
+ // Clean up old states (older than 30 days)
+ let deleted_count = state_manager.cleanup_old_states(30).await?;
+ println!("Deleted {} old states", deleted_count);
+}
+```
+
+## State File Format
+
+State files are stored as JSON with the following structure:
+
+```json
+{
+ "id": "elasticsearch_logs_connector",
+ "last_updated": "2024-01-15T10:30:00Z",
+ "version": 1,
+ "data": {
+ "last_poll_timestamp": "2024-01-15T10:30:00Z",
+ "total_documents_fetched": 15000,
+ "poll_count": 150,
+ "last_document_id": "doc_12345",
+ "last_scroll_id": "scroll_abc123",
+ "last_offset": 15000,
+ "error_count": 2,
+ "last_error": "Connection timeout",
+ "processing_stats": {
+ "total_bytes_processed": 1048576,
+ "avg_batch_processing_time_ms": 125.5,
+ "last_successful_poll": "2024-01-15T10:30:00Z",
+ "empty_polls_count": 5,
+ "successful_polls_count": 145
+ }
+ },
+ "metadata": {
+ "connector_type": "elasticsearch_source",
+ "connector_id": 1,
+ "index": "logs-*",
+ "url": "http://localhost:9200"
+ }
+}
+```
+
+## Best Practices
+
+1. **State ID Uniqueness**: Use unique state IDs for different connector
instances
+2. **Auto-save Interval**: Set appropriate auto-save intervals based on your
data volume
+3. **Storage Location**: Use persistent storage locations for production
deployments
+4. **State Cleanup**: Regularly clean up old state files to prevent disk space
issues
+5. **Error Handling**: Monitor error counts and implement appropriate alerting
+6. **Backup**: Regularly backup state files for disaster recovery
+
+## Troubleshooting
+
+### Common Issues
+
+1. **State Not Loading**: Check file permissions and storage path
+2. **State Corruption**: Delete corrupted state files to start fresh
+3. **Performance Issues**: Adjust auto-save interval and batch sizes
+4. **Storage Full**: Implement state cleanup policies
+
+### Monitoring
+
+Monitor the following metrics:
+
+- State save/load success rates
+- Processing statistics
+- Error counts and types
+- Storage usage for state files
+
+## Migration
+
+To migrate from a connector without state management:
+
+1. Add state configuration to your connector config
+2. Set `enabled = true` in state config
+3. Restart the connector
+4. The connector will start tracking state from the next poll cycle
+
+To migrate between storage backends:
+
+1. Export state from current storage
+2. Update storage configuration
+3. Import state to new storage
+4. Restart connector
diff --git a/docs/connectors/sources/postgres.md
b/docs/connectors/sources/postgres.md
new file mode 100644
index 00000000..6ada771f
--- /dev/null
+++ b/docs/connectors/sources/postgres.md
@@ -0,0 +1,100 @@
+---
+id: connectors-sources-postgres
+slug: /connectors/sources/postgres
+title: Postgres Source
+sidebar_position: 3
+---
+
+The PostgreSQL source connector allows you to fetch data from PostgreSQL
databases and stream it to Iggy topics. It supports both table polling and
Change Data Capture (CDC) modes.
+
+## Features
+
+- **Table Polling**: Incrementally fetch data from PostgreSQL tables
+- **Change Data Capture**: Monitor database changes using PostgreSQL logical
replication
+ - Built-in logical decoding using `pg_logical_slot_get_changes`
+ - Optional backend using Supabase's ETL (`pg_replicate`) framework via
feature flag
+- **Configurable Polling Intervals**: Control how often to check for new data
+- **Batch Processing**: Fetch data in configurable batch sizes
+- **Offset Tracking**: Keep track of processed records to avoid duplicates
+- **Multiple Tables**: Monitor multiple tables simultaneously
+- **Column Mapping**: Transform column names (e.g., to snake_case)
+- **Custom Queries**: Use custom SQL queries instead of simple table polling
+
+## Configuration
+
+```json
+{
+ "connection_string":
"postgresql://username:password@localhost:5432/database",
+ "mode": "polling",
+ "tables": ["users", "orders", "products"],
+ "poll_interval": "30s",
+ "batch_size": 1000,
+ "tracking_column": "updated_at",
+ "initial_offset": "2024-01-01T00:00:00Z",
+ "max_connections": 10,
+ "enable_wal_cdc": false,
+ "custom_query": "SELECT * FROM users WHERE updated_at > $1 ORDER BY
updated_at LIMIT $2",
+ "snake_case_columns": true,
+ "include_metadata": true,
+ "publication_name": "iggy_publication",
+ "replication_slot": "iggy_slot",
+ "capture_operations": ["INSERT", "UPDATE", "DELETE"],
+ "cdc_backend": "builtin"
+}
+```
+
+### Configuration Options
+
+- `connection_string`: PostgreSQL connection string
+- `mode`: Operation mode - `polling` or `cdc`
+- `tables`: List of tables to monitor (used for polling and to create
publications in CDC)
+- `poll_interval`: How often to poll for new data (e.g., "30s", "5m")
+- `batch_size`: Maximum number of rows to fetch per poll (default: 1000)
+- `tracking_column`: Column to track for incremental updates (default: "id")
+- `initial_offset`: Starting value for the tracking column
+- `max_connections`: Maximum database connections (default: 10)
+- `enable_wal_cdc`: Enable WAL-based CDC (requires logical replication setup)
+- `custom_query`: Custom SQL query (overrides table + tracking_column)
+- `snake_case_columns`: Convert column names to snake_case (default: false)
+- `include_metadata`: Wrap results with metadata (default: true)
+- `publication_name`: Logical replication publication name (default:
`iggy_publication`)
+- `replication_slot`: Replication slot name (default: `iggy_slot`)
+- `capture_operations`: Operations to capture in CDC mode (default:
["INSERT","UPDATE","DELETE"])
+- `cdc_backend`: CDC backend implementation: `builtin` (default) or
`pg_replicate`
+
+### pg_replicate backend
+
+- Build-time feature flag: enable with `--features cdc_pg_replicate` on the
`iggy_connector_postgres_source` crate.
+- When `cdc_backend` is set to `pg_replicate` but the feature is not enabled,
the connector will return a clear initialization error.
+- The backend integrates Supabase's ETL for robust WAL decoding and resume
from LSN.
+
+### Notes
+
+- CDC requires `wal_level = logical` on the PostgreSQL server.
+- Ensure a direct connection (no pooler) for replication where required.
+
+## Output Format
+
+Each message contains:
+
+```json
+{
+ "table_name": "users",
+ "operation_type": "SELECT",
+ "timestamp": "2024-01-15T10:30:00Z",
+ "data": {
+ "id": 123,
+ "name": "John Doe",
+ "email": "[email protected]",
+ "updated_at": "2024-01-15T10:29:50Z"
+ },
+ "old_data": null
+}
+```
+
+## Usage Example
+
+1. Configure the connector in your Iggy connectors runtime
+2. The connector will start polling the specified tables
+3. Data changes will be streamed to the configured Iggy topic
+4. Each row becomes a separate message in JSON format
diff --git a/docs/connectors/source.md b/docs/connectors/sources/source.md
similarity index 97%
rename from docs/connectors/source.md
rename to docs/connectors/sources/source.md
index a259cb6b..251472fd 100644
--- a/docs/connectors/source.md
+++ b/docs/connectors/sources/source.md
@@ -2,7 +2,7 @@
id: connectors-source
slug: /connectors/source
title: Source
-sidebar_position: 5
+sidebar_position: 1
---
## Overview
@@ -31,15 +31,15 @@ Each source connector is configured in its own separate
configuration file withi
```rust
pub struct SourceConfig {
- pub id: String,
+ pub key: String,
pub enabled: bool,
pub version: u64,
pub name: String,
pub path: String,
pub transforms: Option<TransformsConfig>,
pub streams: Vec<StreamProducerConfig>,
- pub config_format: Option<ConfigFormat>,
- pub config: Option<serde_json::Value>,
+ pub plugin_config_format: Option<ConfigFormat>,
+ pub plugin_config: Option<serde_json::Value>,
}
```
@@ -56,7 +56,7 @@ config_dir = "path/to/connectors"
```toml
# Type of connector (sink or source)
type = "source"
-id = "random" # Unique source ID
+key = "random" # Unique source ID
# Required configuration for a source connector
enabled = true # Toggle source on/off
@@ -73,8 +73,8 @@ schema = "json"
batch_length = 100
linger_time = "5ms"
-# Custom configuration for the source connector, deserialized to type T from
`config` field
-[config]
+# Custom configuration for the source connector, deserialized to type T from
`plugin_config` field
+[plugin_config]
messages_count = 10
# Optional data transformation(s) to be applied before sending messages to the
stream
@@ -89,7 +89,7 @@ value.static = "hello"
### Environment Variable Overrides
-Configuration properties can be overridden using environment variables. The
pattern follows: `IGGY_CONNECTORS_SOURCE_[ID]_[PROPERTY]`
+Configuration properties can be overridden using environment variables. The
pattern follows: `IGGY_CONNECTORS_SOURCE_[KEY]_[PROPERTY]`
For example, to override the `enabled` property for a source with ID `random`:
diff --git a/docusaurus.config.ts b/docusaurus.config.ts
index f2d675f3..0785983b 100644
--- a/docusaurus.config.ts
+++ b/docusaurus.config.ts
@@ -222,6 +222,7 @@ Apache®, the names of Apache projects, and the feather logo
are either register
prism: {
theme: prismThemes.github,
darkTheme: prismThemes.dracula,
+ additionalLanguages: ["toml", "bash", "rust", "json"],
},
matomo: {
matomoUrl: "https://analytics.apache.org/",