This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new c5f10191f chore(docs): add java examples and ci tests (#2504)
c5f10191f is described below
commit c5f10191f2fcf54f006fc79443769591a47ce693
Author: Rimuksh Kansal <[email protected]>
AuthorDate: Tue Dec 23 19:35:42 2025 +0900
chore(docs): add java examples and ci tests (#2504)
closes #1893
---
.github/config/components.yml | 81 +++--
.github/workflows/_test_examples.yml | 20 ++
examples/java/.gitignore | 6 +
examples/java/README.md | 142 ++++++++
examples/java/build.gradle.kts | 105 ++++++
examples/java/gradle.properties | 21 ++
{foreign => examples}/java/settings.gradle.kts | 20 +-
.../iggy/examples}/async/AsyncConsumerExample.java | 2 +-
.../apache/iggy/examples}/async/AsyncProducer.java | 7 +-
.../consumer/GettingStartedConsumer.java | 122 +++++++
.../producer/GettingStartedProducer.java | 75 +++-
.../consumer/MessageEnvelopeConsumer.java | 154 ++++++++
.../producer/MessageEnvelopeProducer.java | 139 ++++++++
.../multitenant/consumer/MultiTenantConsumer.java | 378 ++++++++++++++++++++
.../multitenant/producer/MultiTenantProducer.java | 392 +++++++++++++++++++++
.../org/apache/iggy/examples/shared/Messages.java | 99 ++++++
.../iggy/examples/shared/MessagesGenerator.java | 70 ++++
.../sinkdataproducer/SinkDataProducer.java | 183 ++++++++++
.../iggy/examples/streambuilder/StreamBasic.java | 163 +++++++++
foreign/java/examples/build.gradle.kts | 73 ----
.../org/apache/iggy/consumer/SimpleConsumer.java | 110 ------
foreign/java/settings.gradle.kts | 3 -
scripts/run-java-examples-from-readme.sh | 142 ++++++++
23 files changed, 2252 insertions(+), 255 deletions(-)
diff --git a/.github/config/components.yml b/.github/config/components.yml
index bb62b887d..c039f249f 100644
--- a/.github/config/components.yml
+++ b/.github/config/components.yml
@@ -32,29 +32,29 @@ components:
# Core library components that others depend on
rust-sdk:
depends_on:
- - "rust-workspace" # SDK is affected by workspace changes
- - "ci-infrastructure" # CI changes trigger full regression
+ - "rust-workspace" # SDK is affected by workspace changes
+ - "ci-infrastructure" # CI changes trigger full regression
paths:
- "core/sdk/**"
rust-common:
depends_on:
- - "rust-workspace" # Common is affected by workspace changes
- - "ci-infrastructure" # CI changes trigger full regression
+ - "rust-workspace" # Common is affected by workspace changes
+ - "ci-infrastructure" # CI changes trigger full regression
paths:
- "core/common/**"
rust-binary-protocol:
depends_on:
- - "rust-workspace" # Protocol is affected by workspace changes
- - "ci-infrastructure" # CI changes trigger full regression
+ - "rust-workspace" # Protocol is affected by workspace changes
+ - "ci-infrastructure" # CI changes trigger full regression
paths:
- "core/binary_protocol/**"
rust-server:
depends_on:
- - "rust-workspace" # Server is affected by workspace changes
- - "ci-infrastructure" # CI changes trigger full regression
+ - "rust-workspace" # Server is affected by workspace changes
+ - "ci-infrastructure" # CI changes trigger full regression
- "rust-common"
- "rust-binary-protocol"
paths:
@@ -62,8 +62,8 @@ components:
rust-cluster:
depends_on:
- - "rust-workspace" # Cluster is affected by workspace changes
- - "ci-infrastructure" # CI changes trigger full regression
+ - "rust-workspace" # Cluster is affected by workspace changes
+ - "ci-infrastructure" # CI changes trigger full regression
paths:
- "core/consensus/"
- "core/metadata/"
@@ -84,11 +84,11 @@ components:
- "rust-connectors"
- "rust-mcp"
- "rust-integration"
- - "ci-infrastructure" # CI changes trigger full regression
+ - "ci-infrastructure" # CI changes trigger full regression
paths:
- "Dockerfile*"
- - "!foreign/**" # Exclude foreign SDKs
- - "!web/**" # Exclude web UI
+ - "!foreign/**" # Exclude foreign SDKs
+ - "!web/**" # Exclude web UI
tasks:
- "check"
- "fmt"
@@ -143,27 +143,27 @@ components:
sdk-python:
depends_on:
- - "rust-sdk" # Python SDK wraps the Rust SDK
- - "rust-server" # For integration tests
- - "ci-infrastructure" # CI changes trigger full regression
+ - "rust-sdk" # Python SDK wraps the Rust SDK
+ - "rust-server" # For integration tests
+ - "ci-infrastructure" # CI changes trigger full regression
paths:
- "foreign/python/**"
tasks: ["lint", "test", "build"]
sdk-node:
depends_on:
- - "rust-sdk" # Node SDK depends on core SDK
- - "rust-server" # For integration tests
- - "ci-infrastructure" # CI changes trigger full regression
+ - "rust-sdk" # Node SDK depends on core SDK
+ - "rust-server" # For integration tests
+ - "ci-infrastructure" # CI changes trigger full regression
paths:
- "foreign/node/**"
tasks: ["lint", "test", "build", "e2e"]
sdk-go:
depends_on:
- - "rust-sdk" # Go SDK depends on core SDK
- - "rust-server" # For integration tests
- - "ci-infrastructure" # CI changes trigger full regression
+ - "rust-sdk" # Go SDK depends on core SDK
+ - "rust-server" # For integration tests
+ - "ci-infrastructure" # CI changes trigger full regression
paths:
- "foreign/go/**"
- "bdd/go/**"
@@ -172,18 +172,19 @@ components:
sdk-java:
depends_on:
- - "rust-sdk" # Java SDK depends on core SDK
- - "rust-server" # For integration tests
- - "ci-infrastructure" # CI changes trigger full regression
+ - "rust-sdk" # Java SDK depends on core SDK
+ - "rust-server" # For integration tests
+ - "ci-infrastructure" # CI changes trigger full regression
paths:
- "foreign/java/**"
+ - "examples/java/**"
tasks: ["lint", "test", "build"]
sdk-csharp:
depends_on:
- - "rust-sdk" # C# SDK depends on core SDK
- - "rust-server" # For integration tests
- - "ci-infrastructure" # CI changes trigger full regression
+ - "rust-sdk" # C# SDK depends on core SDK
+ - "rust-server" # For integration tests
+ - "ci-infrastructure" # CI changes trigger full regression
paths:
- "foreign/csharp/**"
- "examples/csharp/**"
@@ -203,7 +204,7 @@ components:
bdd-python:
depends_on:
- "rust-server"
- - "rust-sdk" # All SDKs depend on core SDK changes
+ - "rust-sdk" # All SDKs depend on core SDK changes
- "sdk-python"
- "ci-infrastructure"
paths:
@@ -214,7 +215,7 @@ components:
bdd-go:
depends_on:
- "rust-server"
- - "rust-sdk" # All SDKs depend on core SDK changes
+ - "rust-sdk" # All SDKs depend on core SDK changes
- "sdk-go"
- "ci-infrastructure"
paths:
@@ -225,7 +226,7 @@ components:
bdd-node:
depends_on:
- "rust-server"
- - "rust-sdk" # All SDKs depend on core SDK changes
+ - "rust-sdk" # All SDKs depend on core SDK changes
- "sdk-node"
- "ci-infrastructure"
paths:
@@ -236,7 +237,7 @@ components:
bdd-csharp:
depends_on:
- "rust-server"
- - "rust-sdk" # All SDKs depend on core SDK changes
+ - "rust-sdk" # All SDKs depend on core SDK changes
- "sdk-csharp"
- "ci-infrastructure"
paths:
@@ -251,7 +252,8 @@ components:
- "sdk-go"
- "sdk-csharp"
- "sdk-python"
- - "ci-infrastructure" # CI changes trigger full regression
+ - "sdk-java"
+ - "ci-infrastructure" # CI changes trigger full regression
paths:
- "examples/**"
- "scripts/run-rust-examples-from-readme.sh"
@@ -259,7 +261,16 @@ components:
- "scripts/run-csharp-examples-from-readme.sh"
- "scripts/run-python-examples-from-readme.sh"
- "scripts/run-node-examples-from-readme.sh"
- tasks: ["examples-rust", "examples-go", "examples-csharp",
"examples-python", "examples-node"]
+ - "scripts/run-java-examples-from-readme.sh"
+ tasks:
+ [
+ "examples-rust",
+ "examples-go",
+ "examples-csharp",
+ "examples-python",
+ "examples-node",
+ "examples-java",
+ ]
web-ui:
paths:
@@ -285,4 +296,4 @@ components:
- ".github/workflows/**/*.yml"
- ".github/actions/**/*.yml"
- ".github/ci/**/*.yml"
- tasks: ["validate"] # Could run workflow validation
+ tasks: ["validate"] # Could run workflow validation
diff --git a/.github/workflows/_test_examples.yml
b/.github/workflows/_test_examples.yml
index 3e403c8cc..d30e2a068 100644
--- a/.github/workflows/_test_examples.yml
+++ b/.github/workflows/_test_examples.yml
@@ -72,6 +72,20 @@ jobs:
path: ~/.cache/pip
key: pip-${{ runner.os }}-${{
hashFiles('foreign/python/pyproject.toml') }}
+ - name: Setup Java
+ if: inputs.component == 'examples-suite' && inputs.task ==
'examples-java'
+ uses: actions/setup-java@v4
+ with:
+ distribution: "temurin"
+ java-version: "17"
+ cache: "gradle"
+
+ - name: Setup Gradle
+ if: inputs.component == 'examples-suite' && inputs.task ==
'examples-java'
+ uses:
gradle/actions/setup-gradle@af1da67850ed9a4cedd57bfd976089dd991e2582 # v4.0.0
+ with:
+ gradle-version: "9.2.1"
+
- name: Build common binaries for all examples
if: inputs.component == 'examples-suite'
run: |
@@ -141,6 +155,12 @@ jobs:
echo "Running Node.js examples tests..."
./scripts/run-node-examples-from-readme.sh
+ - name: Run Java examples
+ if: inputs.component == 'examples-suite' && inputs.task ==
'examples-java'
+ run: |
+ echo "Running Java examples tests..."
+ ./scripts/run-java-examples-from-readme.sh
+
- name: Upload reports
if: always()
uses: actions/upload-artifact@v4
diff --git a/examples/java/.gitignore b/examples/java/.gitignore
new file mode 100644
index 000000000..2bdccb0b4
--- /dev/null
+++ b/examples/java/.gitignore
@@ -0,0 +1,6 @@
+.classpath
+.project
+.settings/
+.gradle/
+build/
+out/
diff --git a/examples/java/README.md b/examples/java/README.md
new file mode 100644
index 000000000..1d89b691b
--- /dev/null
+++ b/examples/java/README.md
@@ -0,0 +1,142 @@
+# Iggy Java Examples
+
+This directory contains comprehensive sample applications that showcase
various usage patterns of the Iggy java client SDK, from basic operations to
advanced multi-tenant scenarios.
+
+Java 17 and Gradle 9.2.1 are recommended for running the examples.
+
+## Running Examples
+
+Iggy requires valid credentials to authenticate client requests. The examples
assume that the server is using the default root credentials, which can be
enabled in one of two ways:
+
+1. Start the server with default credentials:
+
+ ```bash
+ cargo run --bin iggy-server -- --with-default-root-credentials
+ ```
+
+2. Set the appropriate environment variables before starting the server with
`cargo run --bin iggy-server`:
+
+ macOS/Linux:
+
+ ```bash
+ export IGGY_ROOT_USERNAME=iggy
+ export IGGY_ROOT_PASSWORD=iggy
+ ```
+
+ Windows(Powershell):
+
+ ```bash
+ $env:IGGY_ROOT_USERNAME = "iggy"
+ $env:IGGY_ROOT_PASSWORD = "iggy"
+ ```
+
+> **Note** <br>
+> This setup is intended only for development and testing, not production use.
+
+By default, all server data is stored in the `local_data` directory (this can
be changed via `system.path` in `server.toml`).
+
+Root credentials are applied **only on the very first startup**, when no data
directory exists yet.
+Once the server has created and populated the data directory, the existing
stored credentials will always be used, and supplying the
`--with-default-root-credentials` flag or setting the environment variables
will no longer override them.
+
+If the server has already been started once and your example returns `Error:
InvalidCredentials`, then this means the stored credentials differ from the
defaults.
+
+You can reset the credentials in one of two ways:
+
+1. Delete the existing data directory, then start the server again with the
default-credential flag or environment variables.
+2. Use the `--fresh` flag to force a reset:
+
+ ```bash
+ cargo run --bin iggy-server -- --with-default-root-credentials --fresh
+ ```
+
+ This will ignore any existing data directory and re-initialize it with the
default credentials.
+
+For server configuration options and help:
+
+```bash
+cargo run --bin iggy-server -- --help
+```
+
+You can also customize the server using environment variables:
+
+```bash
+## Example: Enable HTTP transport and set custom address
+IGGY_HTTP_ENABLED=true IGGY_TCP_ADDRESS=0.0.0.0:8090 cargo run --bin
iggy-server
+```
+
+## Basic Examples
+
+### Getting Started
+
+A good introduction for newcomers to Iggy:
+
+```bash
+gradle runGettingStartedProducer
+gradle runGettingStartedConsumer
+```
+
+### Message Headers
+
+This example will be created as and when userHeaders
serialization/deserialization is implemented in the Java client SDK.
+
+### Message Envelopes
+
+JSON envelope pattern for polymorphic message handling:
+
+```bash
+gradle runMessageEnvelopeProducer
+gradle runMessageEnvelopeConsumer
+```
+
+Uses MessagesGenerator to create OrderCreated, OrderConfirmed, and
OrderRejected messages wrapped in JSON envelopes for type identification.
+
+## Advanced Examples
+
+### Multi-Tenant Architecture
+
+Complex example demonstrating enterprise-level isolation:
+
+```bash
+gradle runMultiTenantProducer
+gradle runMultiTenantConsumer
+```
+
+Features multiple tenant setup, user creation with stream-specific
permissions, concurrent producers/consumers across tenants, and security
isolation.
+
+### High-Volume Data Generation
+
+Testing and benchmarking support:
+
+```bash
+gradle runSinkDataProducer
+```
+
+Produces high-throughput data (1000+ messages per batch) with realistic user
records.
+
+## Stream Builder Examples
+
+### Stream Builder
+
+Building streams with advanced configuration:
+
+```bash
+gradle runStreamBasic
+```
+
+Shows how to use the stream builder API to create and configure streams with
custom settings.
+
+## Async Client
+
+The following example demonstrates how to use the asynchronous client:
+
+Async producer example:
+
+```bash
+gradle runAsyncProducer
+```
+
+Async consumer example:
+
+```bash
+gradle runAsyncConsumerExample
+```
diff --git a/examples/java/build.gradle.kts b/examples/java/build.gradle.kts
new file mode 100644
index 000000000..46a319207
--- /dev/null
+++ b/examples/java/build.gradle.kts
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+ java
+ id("com.diffplug.spotless") version "8.1.0"
+ checkstyle
+}
+
+repositories {
+ mavenCentral()
+}
+
+dependencies {
+ implementation("org.apache.iggy:iggy:local-dev")
+ implementation("org.slf4j:slf4j-simple:2.0.13")
+ implementation("tools.jackson.core:jackson-databind:3.0.3")
+}
+
+spotless {
+ java {
+ palantirJavaFormat()
+ removeUnusedImports()
+ trimTrailingWhitespace()
+ endWithNewline()
+ formatAnnotations()
+ importOrder("", "\n", "javax|java", "\n", "\\#")
+ toggleOffOn()
+ }
+}
+
+checkstyle {
+ toolVersion = "12.2.0"
+}
+
+tasks.withType<JavaCompile> {
+ dependsOn("spotlessApply")
+}
+
+tasks.register<JavaExec>("runGettingStartedProducer") {
+ classpath = sourceSets["main"].runtimeClasspath
+
mainClass.set("org.apache.iggy.examples.gettingstarted.producer.GettingStartedProducer")
+}
+
+tasks.register<JavaExec>("runGettingStartedConsumer") {
+ classpath = sourceSets["main"].runtimeClasspath
+
mainClass.set("org.apache.iggy.examples.gettingstarted.consumer.GettingStartedConsumer")
+}
+
+tasks.register<JavaExec>("runMessageEnvelopeProducer") {
+ classpath = sourceSets["main"].runtimeClasspath
+
mainClass.set("org.apache.iggy.examples.messageenvelope.producer.MessageEnvelopeProducer")
+}
+
+tasks.register<JavaExec>("runMessageEnvelopeConsumer") {
+ classpath = sourceSets["main"].runtimeClasspath
+
mainClass.set("org.apache.iggy.examples.messageenvelope.consumer.MessageEnvelopeConsumer")
+}
+
+tasks.register<JavaExec>("runSinkDataProducer") {
+ classpath = sourceSets["main"].runtimeClasspath
+ mainClass.set("org.apache.iggy.examples.sinkdataproducer.SinkDataProducer")
+}
+
+
+tasks.register<JavaExec>("runMultiTenantProducer") {
+ classpath = sourceSets["main"].runtimeClasspath
+
mainClass.set("org.apache.iggy.examples.multitenant.producer.MultiTenantProducer")
+}
+
+tasks.register<JavaExec>("runMultiTenantConsumer") {
+ classpath = sourceSets["main"].runtimeClasspath
+
mainClass.set("org.apache.iggy.examples.multitenant.consumer.MultiTenantConsumer")
+}
+
+tasks.register<JavaExec>("runStreamBasic") {
+ classpath = sourceSets["main"].runtimeClasspath
+ mainClass.set("org.apache.iggy.examples.streambuilder.StreamBasic")
+}
+
+tasks.register<JavaExec>("runAsyncProducer") {
+ classpath = sourceSets["main"].runtimeClasspath
+ mainClass.set("org.apache.iggy.examples.async.AsyncProducer")
+}
+
+tasks.register<JavaExec>("runAsyncConsumerExample") {
+ classpath = sourceSets["main"].runtimeClasspath
+ mainClass.set("org.apache.iggy.examples.async.AsyncConsumerExample")
+}
diff --git a/examples/java/gradle.properties b/examples/java/gradle.properties
new file mode 100644
index 000000000..177455b3e
--- /dev/null
+++ b/examples/java/gradle.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.gradle.configuration-cache=true
+
diff --git a/foreign/java/settings.gradle.kts
b/examples/java/settings.gradle.kts
similarity index 60%
copy from foreign/java/settings.gradle.kts
copy to examples/java/settings.gradle.kts
index 0af0da426..c1f176e56 100644
--- a/foreign/java/settings.gradle.kts
+++ b/examples/java/settings.gradle.kts
@@ -16,18 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
+rootProject.name = "java-examples"
-rootProject.name = "iggy-java-sdk"
-
-include("iggy")
-project(":iggy").projectDir = file("java-sdk")
-
-include("iggy-example")
-project(":iggy-example").projectDir = file("examples")
-
-// External processors - Stream processing integrations
-include("iggy-connector-library")
-project(":iggy-connector-library").projectDir =
file("external-processors/iggy-connector-flink/iggy-connector-library")
-
-include("iggy-flink-examples")
-project(":iggy-flink-examples").projectDir =
file("external-processors/iggy-connector-flink/iggy-flink-examples")
+includeBuild("../../foreign/java") {
+ dependencySubstitution {
+ substitute(module("org.apache.iggy:iggy")).using(project(":iggy"))
+ }
+}
diff --git
a/foreign/java/examples/src/main/java/org/apache/iggy/async/AsyncConsumerExample.java
b/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncConsumerExample.java
similarity index 99%
rename from
foreign/java/examples/src/main/java/org/apache/iggy/async/AsyncConsumerExample.java
rename to
examples/java/src/main/java/org/apache/iggy/examples/async/AsyncConsumerExample.java
index 6ecb12c35..1647e8468 100644
---
a/foreign/java/examples/src/main/java/org/apache/iggy/async/AsyncConsumerExample.java
+++
b/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncConsumerExample.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iggy.async;
+package org.apache.iggy.examples.async;
import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
diff --git
a/foreign/java/examples/src/main/java/org/apache/iggy/async/AsyncProducer.java
b/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncProducer.java
similarity index 97%
rename from
foreign/java/examples/src/main/java/org/apache/iggy/async/AsyncProducer.java
rename to
examples/java/src/main/java/org/apache/iggy/examples/async/AsyncProducer.java
index e41dc720a..642b63069 100644
---
a/foreign/java/examples/src/main/java/org/apache/iggy/async/AsyncProducer.java
+++
b/examples/java/src/main/java/org/apache/iggy/examples/async/AsyncProducer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iggy.async;
+package org.apache.iggy.examples.async;
import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
import org.apache.iggy.identifier.StreamId;
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@@ -53,7 +52,7 @@ public class AsyncProducer {
private static final String STREAM_NAME = "async-test";
private static final String TOPIC_NAME = "events";
- private static final long PARTITION_ID = 1L;
+ private static final long PARTITION_ID = 0L;
private static final int MESSAGE_COUNT = 100;
private static final int MESSAGE_SIZE = 256;
@@ -173,8 +172,6 @@ public class AsyncProducer {
messageContent += " ";
}
- byte[] messageBytes = messageContent.getBytes(StandardCharsets.UTF_8);
-
// Use the factory method to create a message
Message message = Message.of(messageContent);
diff --git
a/examples/java/src/main/java/org/apache/iggy/examples/gettingstarted/consumer/GettingStartedConsumer.java
b/examples/java/src/main/java/org/apache/iggy/examples/gettingstarted/consumer/GettingStartedConsumer.java
new file mode 100644
index 000000000..025a0c63c
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/iggy/examples/gettingstarted/consumer/GettingStartedConsumer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.examples.gettingstarted.consumer;
+
+import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.PolledMessages;
+import org.apache.iggy.message.PollingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+
+public final class GettingStartedConsumer {
+
+ private static final StreamId STREAM_ID = StreamId.of("sample-stream");
+ private static final TopicId TOPIC_ID = TopicId.of("sample-topic");
+
+ private static final long PARTITION_ID = 0L;
+
+ private static final int BATCHES_LIMIT = 5;
+
+ private static final long MESSAGES_PER_BATCH = 10L;
+ private static final long INTERVAL_MS = 500;
+
+ private static final Logger log =
LoggerFactory.getLogger(GettingStartedConsumer.class);
+
+ private GettingStartedConsumer() {}
+
+ public static void main(String[] args) {
+ var client = IggyTcpClient.builder()
+ .host("localhost")
+ .port(8090)
+ .credentials("iggy", "iggy")
+ .build();
+
+ consumeMessages(client);
+ }
+
+ private static void consumeMessages(IggyTcpClient client) {
+ log.info(
+ "Messages will be consumed from stream: {}, topic: {},
partition: {} with interval {}ms.",
+ STREAM_ID,
+ TOPIC_ID,
+ PARTITION_ID,
+ INTERVAL_MS);
+
+ BigInteger offset = BigInteger.ZERO;
+ int consumedBatches = 0;
+
+ Consumer consumer = Consumer.of(0L);
+
+ while (true) {
+ if (consumedBatches == BATCHES_LIMIT) {
+ log.info("Consumed {} batches of messages, exiting.",
consumedBatches);
+ return;
+ }
+
+ try {
+ PolledMessages polledMessages = client.messages()
+ .pollMessages(
+ STREAM_ID,
+ TOPIC_ID,
+ Optional.of(PARTITION_ID),
+ consumer,
+ PollingStrategy.offset(offset),
+ MESSAGES_PER_BATCH,
+ false);
+
+ if (polledMessages.messages().isEmpty()) {
+ log.info("No messages found.");
+ Thread.sleep(INTERVAL_MS);
+ continue;
+ }
+
+ for (Message message : polledMessages.messages()) {
+ handleMessage(message, offset);
+ }
+
+ consumedBatches++;
+
+ offset =
offset.add(BigInteger.valueOf(polledMessages.messages().size()));
+
+ Thread.sleep(INTERVAL_MS);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Exception e) {
+ log.error("Error polling messages", e);
+ break;
+ }
+ }
+ }
+
+ private static void handleMessage(Message message, BigInteger offset) {
+ String payload = new String(message.payload(), StandardCharsets.UTF_8);
+ log.info("Handling message at offset {}, payload: {}...", offset,
payload);
+ }
+}
diff --git
a/foreign/java/examples/src/main/java/org/apache/iggy/producer/SimpleProducer.java
b/examples/java/src/main/java/org/apache/iggy/examples/gettingstarted/producer/GettingStartedProducer.java
similarity index 53%
rename from
foreign/java/examples/src/main/java/org/apache/iggy/producer/SimpleProducer.java
rename to
examples/java/src/main/java/org/apache/iggy/examples/gettingstarted/producer/GettingStartedProducer.java
index 5688f3859..0c0f37159 100644
---
a/foreign/java/examples/src/main/java/org/apache/iggy/producer/SimpleProducer.java
+++
b/examples/java/src/main/java/org/apache/iggy/examples/gettingstarted/producer/GettingStartedProducer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iggy.producer;
+package org.apache.iggy.examples.gettingstarted.producer;
import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
import org.apache.iggy.identifier.StreamId;
@@ -31,33 +31,77 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
-import static java.util.Collections.singletonList;
import static java.util.Optional.empty;
-public final class SimpleProducer {
- private static final String STREAM_NAME = "dev01";
+public final class GettingStartedProducer {
+
+ private static final String STREAM_NAME = "sample-stream";
private static final StreamId STREAM_ID = StreamId.of(STREAM_NAME);
- private static final String TOPIC_NAME = "events";
+
+ private static final String TOPIC_NAME = "sample-topic";
private static final TopicId TOPIC_ID = TopicId.of(TOPIC_NAME);
- private static final Logger log =
LoggerFactory.getLogger(SimpleProducer.class);
- private SimpleProducer() {}
+ private static final long PARTITION_ID = 0L;
+
+ private static final int BATCHES_LIMIT = 5;
+
+ private static final int MESSAGES_PER_BATCH = 10;
+ private static final long INTERVAL_MS = 500;
+
+ private static final Logger log =
LoggerFactory.getLogger(GettingStartedProducer.class);
+
+ private GettingStartedProducer() {}
public static void main(String[] args) {
- var client = new IggyTcpClient("localhost", 8090);
- client.users().login("iggy", "iggy");
+ var client = IggyTcpClient.builder()
+ .host("localhost")
+ .port(8090)
+ .credentials("iggy", "iggy")
+ .build();
createStream(client);
createTopic(client);
+ produceMessages(client);
+ }
- int counter = 0;
- while (counter++ < 1000) {
- var message = Message.of("message from simple producer " +
counter);
- client.messages().sendMessages(STREAM_ID, TOPIC_ID,
Partitioning.balanced(), singletonList(message));
- log.debug("Message {} sent", counter);
+ private static void produceMessages(IggyTcpClient client) {
+ log.info(
+ "Messages will be sent to stream: {}, topic: {}, partition: {}
with interval {}ms.",
+ STREAM_NAME,
+ TOPIC_NAME,
+ PARTITION_ID,
+ INTERVAL_MS);
+
+ int currentId = 0;
+ int sentBatches = 0;
+
+ Partitioning partitioning = Partitioning.partitionId(PARTITION_ID);
+
+ while (sentBatches < BATCHES_LIMIT) {
+ try {
+ Thread.sleep(INTERVAL_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+
+ List<Message> messages = new ArrayList<>();
+ for (int i = 0; i < MESSAGES_PER_BATCH; i++) {
+ currentId++;
+ String payload = "message-" + currentId;
+ messages.add(Message.of(payload));
+ }
+
+ client.messages().sendMessages(STREAM_ID, TOPIC_ID, partitioning,
messages);
+ sentBatches++;
+ log.info("Sent {} message(s).", MESSAGES_PER_BATCH);
}
+
+ log.info("Sent {} batches of messages, exiting.", sentBatches);
}
private static void createStream(IggyTcpClient client) {
@@ -66,11 +110,13 @@ public final class SimpleProducer {
return;
}
client.streams().createStream(STREAM_NAME);
+ log.info("Stream {} was created.", STREAM_NAME);
}
private static void createTopic(IggyTcpClient client) {
Optional<TopicDetails> topic = client.topics().getTopic(STREAM_ID,
TOPIC_ID);
if (topic.isPresent()) {
+ log.warn("Topic already exists and will not be created again.");
return;
}
client.topics()
@@ -82,5 +128,6 @@ public final class SimpleProducer {
BigInteger.ZERO,
empty(),
TOPIC_NAME);
+ log.info("Topic {} was created.", TOPIC_NAME);
}
}
diff --git
a/examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/consumer/MessageEnvelopeConsumer.java
b/examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/consumer/MessageEnvelopeConsumer.java
new file mode 100644
index 000000000..fc20eef51
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/consumer/MessageEnvelopeConsumer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.examples.messageenvelope.consumer;
+
+import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.examples.shared.Messages;
+import org.apache.iggy.examples.shared.Messages.Envelope;
+import org.apache.iggy.examples.shared.Messages.OrderConfirmed;
+import org.apache.iggy.examples.shared.Messages.OrderCreated;
+import org.apache.iggy.examples.shared.Messages.OrderRejected;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.PolledMessages;
+import org.apache.iggy.message.PollingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import tools.jackson.databind.ObjectMapper;
+import tools.jackson.databind.json.JsonMapper;
+
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+
+public final class MessageEnvelopeConsumer {
+ private static final String STREAM_NAME = "envelope-stream";
+ private static final StreamId STREAM_ID = StreamId.of(STREAM_NAME);
+
+ private static final String TOPIC_NAME = "envelope-topic";
+ private static final TopicId TOPIC_ID = TopicId.of(TOPIC_NAME);
+
+ private static final long PARTITION_ID = 0L;
+ private static final int BATCHES_LIMIT = 10;
+ private static final long MESSAGES_PER_BATCH = 1L;
+ private static final long INTERVAL_MS = 1;
+
+ private static final Logger log =
LoggerFactory.getLogger(MessageEnvelopeConsumer.class);
+
+ private static final ObjectMapper MAPPER = JsonMapper.builder().build();
+
+ private MessageEnvelopeConsumer() {}
+
+ public static void main(final String[] args) {
+ var client = IggyTcpClient.builder()
+ .host("localhost")
+ .port(8090)
+ .credentials("iggy", "iggy")
+ .build();
+
+ consumeMessages(client);
+ }
+
+ private static void consumeMessages(IggyTcpClient client) {
+ log.info(
+ "Messages will be consumed from stream: {}, topic: {},
partition: {} with interval {}ms.",
+ STREAM_ID,
+ TOPIC_ID,
+ PARTITION_ID,
+ INTERVAL_MS);
+
+ BigInteger offset = BigInteger.ZERO;
+ int consumedBatches = 0;
+
+ Consumer consumer = Consumer.of(0L);
+
+ while (true) {
+ if (consumedBatches == BATCHES_LIMIT) {
+ log.info("Consumed {} batches of messages, exiting.",
consumedBatches);
+ return;
+ }
+
+ try {
+ PolledMessages polledMessages = client.messages()
+ .pollMessages(
+ STREAM_ID,
+ TOPIC_ID,
+ Optional.of(PARTITION_ID),
+ consumer,
+ PollingStrategy.offset(offset),
+ MESSAGES_PER_BATCH,
+ false);
+
+ if (polledMessages.messages().isEmpty()) {
+ log.info("No messages found.");
+ Thread.sleep(INTERVAL_MS);
+ continue;
+ }
+
+ for (Message message : polledMessages.messages()) {
+ handleMessage(message, offset);
+ }
+
+ consumedBatches++;
+
+ offset =
offset.add(BigInteger.valueOf(polledMessages.messages().size()));
+
+ Thread.sleep(INTERVAL_MS);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Exception e) {
+ log.error("Error polling messages", e);
+ break;
+ }
+ }
+ }
+
+ private static void handleMessage(Message message, BigInteger offset) {
+ String json = new String(message.payload(), StandardCharsets.UTF_8);
+ String messageType = "unknown";
+ try {
+ Envelope envelope = MAPPER.readValue(json, Envelope.class);
+ messageType = envelope.messageType();
+ log.info("Handling message type: {} at offset: {}...",
messageType, offset);
+
+ switch (messageType) {
+ case Messages.ORDER_CREATED_TYPE -> {
+ OrderCreated order = MAPPER.readValue(envelope.payload(),
OrderCreated.class);
+ log.info("{}", order);
+ }
+ case Messages.ORDER_CONFIRMED_TYPE -> {
+ OrderConfirmed order =
MAPPER.readValue(envelope.payload(), OrderConfirmed.class);
+ log.info("{}", order);
+ }
+ case Messages.ORDER_REJECTED_TYPE -> {
+ OrderRejected order = MAPPER.readValue(envelope.payload(),
OrderRejected.class);
+ log.info("{}", order);
+ }
+ default -> log.warn("Received unknown message type: {}",
messageType);
+ }
+ } catch (Exception e) {
+ log.error("Failed to handle message type {} at offset {}",
messageType, offset, e);
+ }
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/producer/MessageEnvelopeProducer.java
b/examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/producer/MessageEnvelopeProducer.java
new file mode 100644
index 000000000..8feab9bd1
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/producer/MessageEnvelopeProducer.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.examples.messageenvelope.producer;
+
+import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
+import org.apache.iggy.examples.shared.Messages.SerializableMessage;
+import org.apache.iggy.examples.shared.MessagesGenerator;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.MessageHeader;
+import org.apache.iggy.message.MessageId;
+import org.apache.iggy.message.Partitioning;
+import org.apache.iggy.stream.StreamDetails;
+import org.apache.iggy.topic.CompressionAlgorithm;
+import org.apache.iggy.topic.TopicDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public final class MessageEnvelopeProducer {
+ private static final String STREAM_NAME = "envelope-stream";
+ private static final StreamId STREAM_ID = StreamId.of(STREAM_NAME);
+
+ private static final String TOPIC_NAME = "envelope-topic";
+ private static final TopicId TOPIC_ID = TopicId.of(TOPIC_NAME);
+
+ private static final long PARTITION_ID = 0L;
+ private static final int BATCHES_LIMIT = 10;
+ private static final int MESSAGES_PER_BATCH = 1;
+ private static final long INTERVAL_MS = 1;
+
+ private static final Logger log =
LoggerFactory.getLogger(MessageEnvelopeProducer.class);
+
+ private MessageEnvelopeProducer() {}
+
+ public static void main(String[] args) {
+ var client = IggyTcpClient.builder()
+ .host("localhost")
+ .port(8090)
+ .credentials("iggy", "iggy")
+ .build();
+
+ Optional<StreamDetails> stream = client.streams().getStream(STREAM_ID);
+ if (stream.isPresent()) {
+ log.warn("Stream {} already exists and will not be created
again.", STREAM_NAME);
+ } else {
+ client.streams().createStream(STREAM_NAME);
+ log.info("Stream {} was created.", STREAM_NAME);
+ }
+
+ Optional<TopicDetails> topic = client.topics().getTopic(STREAM_ID,
TOPIC_ID);
+ if (topic.isPresent()) {
+ log.warn("Topic already exists and will not be created again.");
+ } else {
+ client.topics()
+ .createTopic(
+ STREAM_ID,
+ 1L,
+ CompressionAlgorithm.None,
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ Optional.empty(),
+ TOPIC_NAME);
+ log.info("Topic {} was created.", TOPIC_NAME);
+ }
+
+ produceMessages(client);
+ }
+
+ public static void produceMessages(IggyTcpClient client) {
+ log.info(
+ "Messages will be sent to stream: {}, topic: {}, partition: {}
with interval {}ms.",
+ STREAM_NAME,
+ TOPIC_NAME,
+ PARTITION_ID,
+ INTERVAL_MS);
+
+ int sentBatches = 0;
+ Partitioning partitioning = Partitioning.partitionId(PARTITION_ID);
+ MessagesGenerator generator = new MessagesGenerator();
+
+ while (sentBatches < BATCHES_LIMIT) {
+ try {
+ Thread.sleep(INTERVAL_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+
+ List<Message> messages = new ArrayList<>();
+ List<SerializableMessage> serializableMessages = new ArrayList<>();
+
+ for (int i = 0; i < MESSAGES_PER_BATCH; i++) {
+ SerializableMessage serializableMessage = generator.generate();
+ String json = serializableMessage.toJsonEnvelope();
+ byte[] payload = json.getBytes(StandardCharsets.UTF_8);
+
+ MessageHeader header = new MessageHeader(
+ BigInteger.ZERO,
+ MessageId.serverGenerated(),
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ 0L,
+ (long) payload.length);
+ Message message = new Message(header, payload,
Optional.empty());
+ messages.add(message);
+ serializableMessages.add(serializableMessage);
+ }
+
+ client.messages().sendMessages(STREAM_ID, TOPIC_ID, partitioning,
messages);
+ sentBatches++;
+ log.info("Sent messages: {}", serializableMessages);
+ }
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/iggy/examples/multitenant/consumer/MultiTenantConsumer.java
b/examples/java/src/main/java/org/apache/iggy/examples/multitenant/consumer/MultiTenantConsumer.java
new file mode 100644
index 000000000..13d8c1175
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/iggy/examples/multitenant/consumer/MultiTenantConsumer.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.examples.multitenant.consumer;
+
+import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.consumergroup.ConsumerGroupDetails;
+import org.apache.iggy.identifier.ConsumerId;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.PolledMessages;
+import org.apache.iggy.message.PollingStrategy;
+import org.apache.iggy.stream.StreamDetails;
+import org.apache.iggy.topic.TopicDetails;
+import org.apache.iggy.user.GlobalPermissions;
+import org.apache.iggy.user.Permissions;
+import org.apache.iggy.user.StreamPermissions;
+import org.apache.iggy.user.TopicPermissions;
+import org.apache.iggy.user.UserInfo;
+import org.apache.iggy.user.UserInfoDetails;
+import org.apache.iggy.user.UserStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public final class MultiTenantConsumer {
+
+ private static final Logger log =
LoggerFactory.getLogger(MultiTenantConsumer.class);
+
+ private static final String[] TOPICS = {"events", "logs", "notifications"};
+ private static final String CONSUMER_GROUP = "multi-tenant";
+ private static final String PASSWORD = "secret";
+
+ private static final int TENANTS_COUNT = 3;
+ private static final int CONSUMERS_COUNT = 1;
+ private static final boolean ENSURE_ACCESS = true;
+ private static final long MESSAGE_BATCHES_LIMIT = 10L;
+ private static final long MESSAGES_PER_BATCH = 1L;
+ private static final long POLL_INTERVAL_MS = 1L;
+
+ private static final String ADDRESS = "127.0.0.1:8090";
+ private static final String ROOT_USERNAME = "iggy";
+ private static final String ROOT_PASSWORD = "iggy";
+
+ private MultiTenantConsumer() {}
+
+ public static void main(String[] args) {
+ log.info("Multi-tenant consumer has started, tenants: {}, consumers:
{}", TENANTS_COUNT, CONSUMERS_COUNT);
+
+ HostAndPort hostAndPort = parseAddress(ADDRESS);
+ IggyTcpClient rootClient = IggyTcpClient.builder()
+ .host(hostAndPort.host())
+ .port(hostAndPort.port())
+ .credentials(ROOT_USERNAME, ROOT_PASSWORD)
+ .build();
+
+ log.info("Creating users with topic permissions for each tenant");
+ Map<String, String> streamsWithUsers = new HashMap<>();
+ for (int i = 1; i <= TENANTS_COUNT; i++) {
+ String name = "tenant_" + i;
+ String stream = name + "_stream";
+ String user = name + "_consumer";
+ createUser(rootClient, stream, user);
+ streamsWithUsers.put(stream, user);
+ }
+
+ log.info("Creating clients for each tenant");
+ List<Tenant> tenants = new ArrayList<>();
+ int tenantId = 1;
+ for (Map.Entry<String, String> entry : streamsWithUsers.entrySet()) {
+ IggyTcpClient client = IggyTcpClient.builder()
+ .host(hostAndPort.host())
+ .port(hostAndPort.port())
+ .credentials(entry.getValue(), PASSWORD)
+ .build();
+ tenants.add(Tenant.newTenant(tenantId, entry.getKey(),
entry.getValue(), client));
+ tenantId++;
+ }
+
+ if (ENSURE_ACCESS) {
+ log.info("Ensuring access to topics for each tenant");
+ for (Tenant tenant : tenants) {
+ List<String> unavailable = new ArrayList<>();
+ for (Tenant other : tenants) {
+ if (!other.stream().equals(tenant.stream())) {
+ unavailable.add(other.stream());
+ }
+ }
+ ensureStreamTopicsAccess(tenant.client(), tenant.stream(),
unavailable);
+ }
+ }
+
+ log.info("Creating {} consumer(s) for each tenant", CONSUMERS_COUNT);
+ for (Tenant tenant : tenants) {
+ List<TenantConsumer> consumers = createConsumers(tenant.client(),
CONSUMERS_COUNT, tenant.stream());
+ tenant.addConsumers(consumers);
+ log.info(
+ "Created {} consumer(s) for tenant stream: {}, username:
{}",
+ CONSUMERS_COUNT,
+ tenant.stream(),
+ tenant.user());
+ }
+
+ log.info("Starting {} consumer(s) for each tenant", CONSUMERS_COUNT);
+ ExecutorService executor = Executors.newCachedThreadPool();
+ List<Future<?>> tasks = new ArrayList<>();
+ for (Tenant tenant : tenants) {
+ for (TenantConsumer consumer : tenant.consumers()) {
+ tasks.add(executor.submit(() -> consume(tenant.id(),
consumer)));
+ }
+ }
+
+ waitFor(tasks);
+ executor.shutdown();
+ log.info("Finished consuming messages for all tenants");
+ }
+
+ private static void consume(int tenantId, TenantConsumer tenantConsumer) {
+ long batchesProcessed = 0;
+ try {
+ tenantConsumer
+ .client()
+ .consumerGroups()
+ .joinConsumerGroup(
+ tenantConsumer.streamId(),
+ tenantConsumer.topicId(),
+ tenantConsumer.consumer().id());
+
+ while (batchesProcessed < MESSAGE_BATCHES_LIMIT) {
+ PolledMessages polledMessages = tenantConsumer
+ .client()
+ .messages()
+ .pollMessages(
+ tenantConsumer.streamId(),
+ tenantConsumer.topicId(),
+ Optional.empty(),
+ tenantConsumer.consumer(),
+ PollingStrategy.next(),
+ MESSAGES_PER_BATCH,
+ true);
+
+ if (polledMessages.messages().isEmpty()) {
+ TimeUnit.MILLISECONDS.sleep(POLL_INTERVAL_MS);
+ continue;
+ }
+
+ for (Message message : polledMessages.messages()) {
+ String payload = new String(message.payload(),
StandardCharsets.UTF_8);
+ log.info(
+ "Tenant: {} consumer: {} received: {} from
partition: {}, topic: {}, stream: {}, at offset: {}, current offset: {}",
+ tenantId,
+ tenantConsumer.id(),
+ payload,
+ polledMessages.partitionId(),
+ tenantConsumer.topic(),
+ tenantConsumer.stream(),
+ message.header().offset(),
+ polledMessages.currentOffset());
+ }
+ batchesProcessed++;
+ TimeUnit.MILLISECONDS.sleep(POLL_INTERVAL_MS);
+ }
+
+ log.info(
+ "Tenant: {} consumer: {} reached message batches limit:
{}, stopping.",
+ tenantId,
+ tenantConsumer.id(),
+ MESSAGE_BATCHES_LIMIT);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception error) {
+ log.error(
+ "Error while consuming messages by tenant: {}, consumer:
{}, topic: {}, stream: {}",
+ tenantId,
+ tenantConsumer.id(),
+ tenantConsumer.topic(),
+ tenantConsumer.stream(),
+ error);
+ }
+ }
+
+ private static List<TenantConsumer> createConsumers(IggyTcpClient client,
int consumersCount, String stream) {
+ List<TenantConsumer> consumers = new ArrayList<>();
+ StreamId streamId = StreamId.of(stream);
+
+ for (String topic : TOPICS) {
+ TopicId topicId = TopicId.of(topic);
+ ensureConsumerGroup(client, streamId, topicId);
+
+ for (int id = 1; id <= consumersCount; id++) {
+ Consumer consumer =
Consumer.group(ConsumerId.of(CONSUMER_GROUP));
+ consumers.add(TenantConsumer.newConsumer(id, stream, topic,
streamId, topicId, consumer, client));
+ }
+ }
+
+ return consumers;
+ }
+
+ private static void ensureConsumerGroup(IggyTcpClient client, StreamId
streamId, TopicId topicId) {
+ Optional<ConsumerGroupDetails> consumerGroup =
+ client.consumerGroups().getConsumerGroup(streamId, topicId,
ConsumerId.of(CONSUMER_GROUP));
+ if (consumerGroup.isPresent()) {
+ return;
+ }
+ client.consumerGroups().createConsumerGroup(streamId, topicId,
CONSUMER_GROUP);
+ log.info(
+ "Created consumer group: {} for stream: {}, topic: {}",
+ CONSUMER_GROUP,
+ streamId.getName(),
+ topicId.getName());
+ }
+
+ private static void createUser(IggyTcpClient client, String streamName,
String username) {
+ if (userExists(client, username)) {
+ log.info("User: {} already exists, skipping creation", username);
+ return;
+ }
+
+ StreamDetails stream = client.streams()
+ .getStream(StreamId.of(streamName))
+ .orElseThrow(() -> new IllegalStateException("Stream does not
exist: " + streamName));
+
+ Map<Long, TopicPermissions> topicPermissions = new HashMap<>();
+ for (String topic : TOPICS) {
+ TopicDetails topicDetails = client.topics()
+ .getTopic(StreamId.of(streamName), TopicId.of(topic))
+ .orElseThrow(() -> new IllegalStateException("Topic does
not exist: " + topic));
+ topicPermissions.put(topicDetails.id(), new
TopicPermissions(false, true, true, false));
+ }
+
+ Map<Long, StreamPermissions> streamPermissions = new HashMap<>();
+ streamPermissions.put(
+ stream.id(), new StreamPermissions(false, true, false, true,
true, false, topicPermissions));
+
+ Permissions permissions = new Permissions(
+ new GlobalPermissions(false, false, false, false, false,
false, false, false, false, false),
+ streamPermissions);
+
+ UserInfoDetails user =
+ client.users().createUser(username, PASSWORD,
UserStatus.Active, Optional.of(permissions));
+ log.info(
+ "Created user: {} with ID: {}, with permissions for topics: {}
in stream: {}",
+ username,
+ user.id(),
+ List.of(TOPICS),
+ streamName);
+ }
+
+ private static boolean userExists(IggyTcpClient client, String username) {
+ try {
+ for (UserInfo user : client.users().getUsers()) {
+ if (user.username().equals(username)) {
+ return true;
+ }
+ }
+ } catch (Exception e) {
+ log.debug("Unable to check if user {} exists: {}", username,
e.getMessage());
+ }
+ return false;
+ }
+
+ private static void ensureStreamTopicsAccess(
+ IggyTcpClient client, String availableStream, List<String>
unavailableStreams) {
+ StreamId availableStreamId = StreamId.of(availableStream);
+ for (String topic : TOPICS) {
+ TopicId topicId = TopicId.of(topic);
+ Optional<TopicDetails> topicDetails =
client.topics().getTopic(availableStreamId, topicId);
+ if (topicDetails.isEmpty()) {
+ throw new IllegalStateException("No access to topic: " + topic
+ " in stream: " + availableStream);
+ }
+ log.info("Ensured access to topic: {} in stream: {}", topic,
availableStream);
+ }
+
+ for (String otherStream : unavailableStreams) {
+ StreamId forbiddenStreamId = StreamId.of(otherStream);
+ for (String topic : TOPICS) {
+ TopicId topicId = TopicId.of(topic);
+ try {
+ Optional<TopicDetails> forbidden =
client.topics().getTopic(forbiddenStreamId, topicId);
+ if (forbidden.isPresent()) {
+ throw new IllegalStateException(
+ "Access to topic: " + topic + " in stream: " +
otherStream + " should not be allowed");
+ }
+ log.info("Ensured no access to topic: {} in stream: {}",
topic, otherStream);
+ } catch (Exception e) {
+ log.info("Ensured no access to topic: {} in stream: {}
({})", topic, otherStream, e.getMessage());
+ }
+ }
+ }
+ }
+
+ private static void waitFor(List<Future<?>> tasks) {
+ for (Future<?> task : tasks) {
+ try {
+ task.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Exception e) {
+ log.warn("Consumer task failed: {}", e.getMessage());
+ }
+ }
+ }
+
+ private static HostAndPort parseAddress(String address) {
+ String[] parts = address.split(":", 2);
+ String host = parts.length > 0 && !parts[0].isBlank() ? parts[0] :
"localhost";
+ int port = 8090;
+ if (parts.length == 2) {
+ try {
+ port = Integer.parseInt(parts[1]);
+ } catch (NumberFormatException ignored) {
+ log.warn("Invalid port in address, defaulting to 8090");
+ }
+ }
+ return new HostAndPort(host, port);
+ }
+
+ private record HostAndPort(String host, int port) {}
+
+ private record Tenant(int id, String stream, String user, IggyTcpClient
client, List<TenantConsumer> consumers) {
+ static Tenant newTenant(int id, String stream, String user,
IggyTcpClient client) {
+ return new Tenant(id, stream, user, client, new ArrayList<>());
+ }
+
+ void addConsumers(List<TenantConsumer> newConsumers) {
+ this.consumers.addAll(newConsumers);
+ }
+ }
+
+ private record TenantConsumer(
+ int id,
+ String stream,
+ String topic,
+ StreamId streamId,
+ TopicId topicId,
+ Consumer consumer,
+ IggyTcpClient client) {
+ static TenantConsumer newConsumer(
+ int id,
+ String stream,
+ String topic,
+ StreamId streamId,
+ TopicId topicId,
+ Consumer consumer,
+ IggyTcpClient client) {
+ return new TenantConsumer(id, stream, topic, streamId, topicId,
consumer, client);
+ }
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/iggy/examples/multitenant/producer/MultiTenantProducer.java
b/examples/java/src/main/java/org/apache/iggy/examples/multitenant/producer/MultiTenantProducer.java
new file mode 100644
index 000000000..1dd41a51f
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/iggy/examples/multitenant/producer/MultiTenantProducer.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.examples.multitenant.producer;
+
+import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.Partitioning;
+import org.apache.iggy.stream.StreamDetails;
+import org.apache.iggy.topic.CompressionAlgorithm;
+import org.apache.iggy.topic.TopicDetails;
+import org.apache.iggy.user.GlobalPermissions;
+import org.apache.iggy.user.Permissions;
+import org.apache.iggy.user.StreamPermissions;
+import org.apache.iggy.user.UserInfo;
+import org.apache.iggy.user.UserInfoDetails;
+import org.apache.iggy.user.UserStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public final class MultiTenantProducer {
+
+ private static final Logger log =
LoggerFactory.getLogger(MultiTenantProducer.class);
+
+ private static final String[] TOPICS = {"events", "logs", "notifications"};
+ private static final String PASSWORD = "secret";
+
+ private MultiTenantProducer() {}
+
+ public static void main(String[] args) {
+ int tenantsCount = 3;
+ int producersCount = 3;
+ int partitionsCount = 3;
+ boolean ensureAccess = true;
+ long batchesLimit = 10L;
+ int messagesPerBatch = 1;
+ long intervalMs = 1L;
+
+ String address = "127.0.0.1:8090";
+ String rootUsername = "iggy";
+ String rootPassword = "iggy";
+
+ log.info(
+ "Multi-tenant producer has started, tenants: {}, producers:
{}, partitions: {}",
+ tenantsCount,
+ producersCount,
+ partitionsCount);
+
+ HostAndPort hostAndPort = parseAddress(address);
+ IggyTcpClient rootClient = IggyTcpClient.builder()
+ .host(hostAndPort.host())
+ .port(hostAndPort.port())
+ .credentials(rootUsername, rootPassword)
+ .build();
+
+ log.info("Creating streams and users with permissions for each
tenant");
+ Map<String, String> streamsWithUsers = new HashMap<>();
+ for (int i = 1; i <= tenantsCount; i++) {
+ String tenantPrefix = "tenant_" + i;
+ String stream = tenantPrefix + "_stream";
+ String user = tenantPrefix + "_producer";
+ createStreamAndUser(rootClient, stream, user);
+ streamsWithUsers.put(stream, user);
+ }
+
+ log.info("Creating clients for each tenant");
+ List<Tenant> tenants = new ArrayList<>();
+ int tenantId = 1;
+ for (Map.Entry<String, String> entry : streamsWithUsers.entrySet()) {
+ IggyTcpClient client = IggyTcpClient.builder()
+ .host(hostAndPort.host())
+ .port(hostAndPort.port())
+ .credentials(entry.getValue(), PASSWORD)
+ .build();
+ tenants.add(Tenant.newTenant(tenantId, entry.getKey(),
entry.getValue(), client));
+ tenantId++;
+ }
+
+ if (ensureAccess) {
+ log.info("Ensuring access to streams for each tenant");
+ for (Tenant tenant : tenants) {
+ List<String> unavailable = new ArrayList<>();
+ for (Tenant other : tenants) {
+ if (!other.stream().equals(tenant.stream())) {
+ unavailable.add(other.stream());
+ }
+ }
+ ensureStreamAccess(tenant.client(), tenant.stream(),
unavailable);
+ }
+ }
+
+ log.info("Creating {} producer(s) for each tenant", producersCount);
+ for (Tenant tenant : tenants) {
+ List<TenantProducer> producers =
+ createProducers(tenant.client(), producersCount,
partitionsCount, tenant.stream(), TOPICS);
+ tenant.addProducers(producers);
+ log.info(
+ "Created {} producer(s) for tenant stream: {}, username:
{}",
+ producersCount,
+ tenant.stream(),
+ tenant.user());
+ }
+
+ log.info("Starting {} producer(s) for each tenant", producersCount);
+ ExecutorService executor = Executors.newCachedThreadPool();
+ List<Future<?>> tasks = new ArrayList<>();
+ for (Tenant tenant : tenants) {
+ for (TenantProducer producer : tenant.producers()) {
+ tasks.add(executor.submit(() ->
+ sendBatches(tenant.id(), producer, batchesLimit,
messagesPerBatch, intervalMs, TOPICS.length)));
+ }
+ }
+
+ waitFor(tasks);
+ executor.shutdown();
+ log.info("Disconnecting clients");
+ }
+
+ private static void sendBatches(
+ int tenantId,
+ TenantProducer producer,
+ long batchesCount,
+ int batchLength,
+ long intervalMs,
+ int topicsCount) {
+ long counter = 1;
+ long eventsId = 1;
+ long logsId = 1;
+ long notificationsId = 1;
+
+ while (counter <= (long) topicsCount * batchesCount) {
+ long messageId;
+ String messagePrefix;
+ switch (producer.topic()) {
+ case "events" -> {
+ eventsId += 1;
+ messageId = eventsId;
+ messagePrefix = "event";
+ }
+ case "logs" -> {
+ logsId += 1;
+ messageId = logsId;
+ messagePrefix = "log";
+ }
+ case "notifications" -> {
+ notificationsId += 1;
+ messageId = notificationsId;
+ messagePrefix = "notification";
+ }
+ default -> throw new IllegalStateException("Invalid topic");
+ }
+
+ List<Message> messages = new ArrayList<>(batchLength);
+ List<String> payloads = new ArrayList<>(batchLength);
+ for (int i = 0; i < batchLength; i++) {
+ String payload = messagePrefix + "-" + producer.id() + "-" +
messageId;
+ messages.add(Message.of(payload));
+ payloads.add(payload);
+ }
+
+ try {
+ producer.client()
+ .messages()
+ .sendMessages(producer.streamId(), producer.topicId(),
producer.partitioning(), messages);
+ log.info(
+ "Sent: {} message(s) by tenant: {}, producer: {}, to:
{} -> {}",
+ batchLength,
+ tenantId,
+ producer.id(),
+ producer.stream(),
+ producer.topic());
+ } catch (Exception error) {
+ log.error(
+ "Failed to send: {} message(s) to: {} -> {} by tenant:
{}, producer: {} with error: {}",
+ batchLength,
+ producer.stream(),
+ producer.topic(),
+ tenantId,
+ producer.id(),
+ error.getMessage());
+ }
+
+ counter += 1;
+
+ if (intervalMs > 0) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(intervalMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ }
+
+ private static List<TenantProducer> createProducers(
+ IggyTcpClient client, int producersCount, int partitionsCount,
String stream, String[] topics) {
+ List<TenantProducer> producers = new ArrayList<>();
+ StreamId streamId = StreamId.of(stream);
+
+ for (String topic : topics) {
+ TopicId topicId = TopicId.of(topic);
+ ensureTopic(client, streamId, topicId, partitionsCount);
+
+ for (int id = 1; id <= producersCount; id++) {
+ Partitioning partitioning = Partitioning.balanced();
+ producers.add(TenantProducer.newProducer(id, stream, topic,
streamId, topicId, client, partitioning));
+ }
+ }
+ return producers;
+ }
+
+ private static void ensureTopic(IggyTcpClient client, StreamId streamId,
TopicId topicId, int partitionsCount) {
+ Optional<TopicDetails> topic = tryGetTopic(client, streamId, topicId);
+ if (topic.isPresent()) {
+ log.info("Topic {} already exists for stream {}",
topicId.getName(), streamId.getName());
+ return;
+ }
+
+ client.topics()
+ .createTopic(
+ streamId,
+ (long) partitionsCount,
+ CompressionAlgorithm.None,
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ Optional.empty(),
+ topicId.getName());
+ log.info("Created topic {} for stream {}", topicId.getName(),
streamId.getName());
+ }
+
+ private static void ensureStreamAccess(
+ IggyTcpClient client, String availableStream, List<String>
unavailableStreams) {
+ StreamId availableStreamId = StreamId.of(availableStream);
+ Optional<StreamDetails> stream = tryGetStream(client,
availableStreamId);
+ if (stream.isEmpty()) {
+ throw new IllegalStateException("No access to stream: " +
availableStream);
+ }
+ log.info("Ensured access to stream: {}", availableStream);
+
+ for (String otherStream : unavailableStreams) {
+ Optional<StreamDetails> forbidden = tryGetStream(client,
StreamId.of(otherStream));
+ if (forbidden.isEmpty()) {
+ log.info("Ensured no access to stream: {}", otherStream);
+ } else {
+ throw new IllegalStateException("Access to stream: " +
otherStream + " should not be allowed");
+ }
+ }
+ }
+
+ private static Optional<StreamDetails> tryGetStream(IggyTcpClient client,
StreamId streamId) {
+ try {
+ return client.streams().getStream(streamId);
+ } catch (Exception e) {
+ log.debug("Unable to get stream {}: {}", streamId.getName(),
e.getMessage());
+ return Optional.empty();
+ }
+ }
+
+ private static Optional<TopicDetails> tryGetTopic(IggyTcpClient client,
StreamId streamId, TopicId topicId) {
+ try {
+ return client.topics().getTopic(streamId, topicId);
+ } catch (Exception e) {
+ log.debug("Unable to get topic {} in stream {}: {}",
topicId.getName(), streamId.getName(), e.getMessage());
+ return Optional.empty();
+ }
+ }
+
+ private static void createStreamAndUser(IggyTcpClient client, String
streamName, String username) {
+ StreamDetails stream = client.streams()
+ .getStream(StreamId.of(streamName))
+ .orElseGet(() -> client.streams().createStream(streamName));
+ log.info("Created stream: {} with ID: {}", streamName, stream.id());
+
+ Map<Long, StreamPermissions> streamPermissions = new HashMap<>();
+ streamPermissions.put(stream.id(), new StreamPermissions(false, true,
true, true, false, true, Map.of()));
+
+ Permissions permissions = new Permissions(
+ new GlobalPermissions(false, false, false, false, false,
false, false, false, false, false),
+ streamPermissions);
+
+ if (userExists(client, username)) {
+ log.info("User: {} already exists, skipping creation", username);
+ return;
+ }
+
+ UserInfoDetails user =
+ client.users().createUser(username, PASSWORD,
UserStatus.Active, Optional.of(permissions));
+ log.info("Created user: {} with ID: {}, with permissions for stream:
{}", username, user.id(), streamName);
+ }
+
+ private static boolean userExists(IggyTcpClient client, String username) {
+ try {
+ for (UserInfo user : client.users().getUsers()) {
+ if (user.username().equals(username)) {
+ return true;
+ }
+ }
+ } catch (Exception e) {
+ log.debug("Unable to check if user {} exists: {}", username,
e.getMessage());
+ }
+ return false;
+ }
+
+ private static void waitFor(List<Future<?>> tasks) {
+ for (Future<?> task : tasks) {
+ try {
+ task.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Exception e) {
+ log.warn("Producer task failed: {}", e.getMessage());
+ }
+ }
+ }
+
+ private static HostAndPort parseAddress(String address) {
+ String[] parts = address.split(":", 2);
+ String host = parts.length > 0 && !parts[0].isBlank() ? parts[0] :
"localhost";
+ int port = 8090;
+ if (parts.length == 2) {
+ try {
+ port = Integer.parseInt(parts[1]);
+ } catch (NumberFormatException ignored) {
+ log.warn("Invalid port in given address, defaulting to 8090");
+ }
+ }
+ return new HostAndPort(host, port);
+ }
+
+ private record HostAndPort(String host, int port) {}
+
+ private record Tenant(int id, String stream, String user, IggyTcpClient
client, List<TenantProducer> producers) {
+ static Tenant newTenant(int id, String stream, String user,
IggyTcpClient client) {
+ return new Tenant(id, stream, user, client, new ArrayList<>());
+ }
+
+ void addProducers(List<TenantProducer> producers) {
+ this.producers.addAll(producers);
+ }
+ }
+
+ private record TenantProducer(
+ int id,
+ String stream,
+ String topic,
+ StreamId streamId,
+ TopicId topicId,
+ IggyTcpClient client,
+ Partitioning partitioning) {
+ static TenantProducer newProducer(
+ int id,
+ String stream,
+ String topic,
+ StreamId streamId,
+ TopicId topicId,
+ IggyTcpClient client,
+ Partitioning partitioning) {
+ return new TenantProducer(id, stream, topic, streamId, topicId,
client, partitioning);
+ }
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/iggy/examples/shared/Messages.java
b/examples/java/src/main/java/org/apache/iggy/examples/shared/Messages.java
new file mode 100644
index 000000000..93e61d6b0
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/iggy/examples/shared/Messages.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.examples.shared;
+
+import tools.jackson.core.JacksonException;
+import tools.jackson.databind.ObjectMapper;
+import tools.jackson.databind.json.JsonMapper;
+
+import java.time.Instant;
+
+public final class Messages {
+ public static final String ORDER_CREATED_TYPE = "order_created";
+ public static final String ORDER_CONFIRMED_TYPE = "order_confirmed";
+ public static final String ORDER_REJECTED_TYPE = "order_rejected";
+
+ private static final ObjectMapper MAPPER = JsonMapper.builder().build();
+
+ public interface SerializableMessage {
+ String getMessageType();
+
+ default String toJson() {
+ return toJsonWith(MAPPER);
+ }
+
+ default String toJsonEnvelope() {
+ try {
+ return MAPPER.writeValueAsString(Envelope.of(getMessageType(),
this));
+ } catch (JacksonException e) {
+ throw new IllegalStateException("Failed to serialize
envelope", e);
+ }
+ }
+
+ private String toJsonWith(ObjectMapper mapper) {
+ try {
+ return mapper.writeValueAsString(this);
+ } catch (JacksonException e) {
+ throw new IllegalStateException("Failed to serialize message",
e);
+ }
+ }
+ }
+
+ public record Envelope(String messageType, String payload) {
+ public static Envelope of(String messageType, Object payload) {
+ try {
+ return new Envelope(messageType,
MAPPER.writeValueAsString(payload));
+ } catch (JacksonException e) {
+ throw new IllegalStateException("Failed to serialize payload",
e);
+ }
+ }
+
+ public String toJson() {
+ try {
+ return MAPPER.writeValueAsString(this);
+ } catch (JacksonException e) {
+ throw new IllegalStateException("Failed to serialize
envelope", e);
+ }
+ }
+ }
+
+ public record OrderCreated(
+ long orderId, String currencyPair, double price, double quantity,
String side, Instant timestamp)
+ implements SerializableMessage {
+ @Override
+ public String getMessageType() {
+ return ORDER_CREATED_TYPE;
+ }
+ }
+
+ public record OrderConfirmed(long orderId, double price, Instant
timestamp) implements SerializableMessage {
+ @Override
+ public String getMessageType() {
+ return ORDER_CONFIRMED_TYPE;
+ }
+ }
+
+ public record OrderRejected(long orderId, Instant timestamp, String
reason) implements SerializableMessage {
+ @Override
+ public String getMessageType() {
+ return ORDER_REJECTED_TYPE;
+ }
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/iggy/examples/shared/MessagesGenerator.java
b/examples/java/src/main/java/org/apache/iggy/examples/shared/MessagesGenerator.java
new file mode 100644
index 000000000..42a77291d
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/iggy/examples/shared/MessagesGenerator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.examples.shared;
+
+import org.apache.iggy.examples.shared.Messages.OrderConfirmed;
+import org.apache.iggy.examples.shared.Messages.OrderCreated;
+import org.apache.iggy.examples.shared.Messages.OrderRejected;
+import org.apache.iggy.examples.shared.Messages.SerializableMessage;
+
+import java.time.Instant;
+import java.util.random.RandomGenerator;
+import java.util.random.RandomGeneratorFactory;
+
+public final class MessagesGenerator {
+
+ private static final String[] CURRENCY_PAIRS = {"EUR/USD", "EUR/GBP",
"USD/GBP", "EUR/PLN", "USD/PLN"};
+ private final RandomGenerator rng =
RandomGeneratorFactory.getDefault().create();
+ private long orderId = 0L;
+
+ public MessagesGenerator() {}
+
+ public SerializableMessage generate() {
+ int choice = rng.nextInt(3);
+ return switch (choice) {
+ case 0 -> generateOrderCreated();
+ case 1 -> generateOrderConfirmed();
+ case 2 -> generateOrderRejected();
+ default -> throw new IllegalStateException("Unexpected message
type");
+ };
+ }
+
+ public SerializableMessage generateOrderCreated() {
+ orderId += 1;
+ String currencyPair =
CURRENCY_PAIRS[rng.nextInt(CURRENCY_PAIRS.length)];
+ double price = rng.nextDouble(10.0, 1000.0);
+ double quantity = rng.nextDouble(0.1, 1.0);
+ String side = rng.nextInt(2) == 0 ? "buy" : "sell";
+
+ return new OrderCreated(orderId, currencyPair, price, quantity, side,
Instant.now());
+ }
+
+ private SerializableMessage generateOrderConfirmed() {
+ orderId += 1;
+ double price = rng.nextDouble(10.0, 1000.0);
+ return new OrderConfirmed(orderId, price, Instant.now());
+ }
+
+ private SerializableMessage generateOrderRejected() {
+ orderId += 1;
+ String reason = rng.nextInt(2) == 0 ? "cancelled_by_user" : "other";
+ return new OrderRejected(orderId, Instant.now(), reason);
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/iggy/examples/sinkdataproducer/SinkDataProducer.java
b/examples/java/src/main/java/org/apache/iggy/examples/sinkdataproducer/SinkDataProducer.java
new file mode 100644
index 000000000..44ad2c007
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/iggy/examples/sinkdataproducer/SinkDataProducer.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.examples.sinkdataproducer;
+
+import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.Partitioning;
+import org.apache.iggy.topic.CompressionAlgorithm;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import tools.jackson.core.JacksonException;
+import tools.jackson.databind.ObjectMapper;
+import tools.jackson.databind.json.JsonMapper;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Random;
+
+public final class SinkDataProducer {
+
+ private static final String[] SOURCES = {"browser", "mobile", "desktop",
"email", "network", "other"};
+ private static final String[] STATES = {"active", "inactive", "blocked",
"deleted", "unknown"};
+ private static final String[] DOMAINS = {"gmail.com", "yahoo.com",
"hotmail.com", "outlook.com", "aol.com"};
+
+ private static final int MAX_BATCHES = 100;
+ private static final Logger log =
LoggerFactory.getLogger(SinkDataProducer.class);
+
+ private static final ObjectMapper MAPPER = JsonMapper.builder().build();
+
+ private SinkDataProducer() {}
+
+ public static void main(String[] args) {
+ String address = "localhost:8090";
+ String username = "iggy";
+ String password = "iggy";
+ String stream = "qw";
+ String topic = "records";
+
+ HostAndPort hostAndPort = parseAddress(address);
+ var client = IggyTcpClient.builder()
+ .host(hostAndPort.host())
+ .port(hostAndPort.port())
+ .credentials(username, password)
+ .build();
+
+ StreamId streamId = StreamId.of(stream);
+ TopicId topicId = TopicId.of(topic);
+ Partitioning partitioning = Partitioning.balanced();
+
+ Random random = new Random();
+ int batchesCount = 0;
+ log.info("Starting data producer...");
+
+ createStreamIfMissing(client, streamId);
+ createTopicIfMissing(client, streamId, topicId);
+
+ while (batchesCount < MAX_BATCHES) {
+ int recordsCount = random.nextInt(100) + 1000;
+ List<Message> messages = new ArrayList<>(recordsCount);
+
+ for (int i = 0; i < recordsCount; i++) {
+ UserRecord record = randomRecord(random);
+ try {
+ messages.add(Message.of(record.toJson(MAPPER)));
+ } catch (JacksonException e) {
+ log.warn("Failed to serialize record, skipping.", e);
+ }
+ }
+
+ client.messages().sendMessages(streamId, topicId, partitioning,
messages);
+ log.info("Sent {} messages", recordsCount);
+ batchesCount++;
+ }
+
+ log.info("Reached maximum batches count");
+ }
+
+ private static void createStreamIfMissing(IggyTcpClient client, StreamId
streamId) {
+ client.streams()
+ .getStream(streamId)
+ .ifPresentOrElse(existing -> log.info("Stream {} already
exists.", streamId.getName()), () -> {
+ client.streams().createStream(streamId.getName());
+ log.info("Created stream {}.", streamId.getName());
+ });
+ }
+
+ private static void createTopicIfMissing(IggyTcpClient client, StreamId
streamId, TopicId topicId) {
+ client.topics()
+ .getTopic(streamId, topicId)
+ .ifPresentOrElse(existing -> log.info("Topic {} already
exists.", topicId.getName()), () -> {
+ client.topics()
+ .createTopic(
+ streamId,
+ 1L,
+ CompressionAlgorithm.None,
+ java.math.BigInteger.ZERO,
+ java.math.BigInteger.ZERO,
+ java.util.Optional.empty(),
+ topicId.getName());
+ log.info("Created topic {}.", topicId.getName());
+ });
+ }
+
+ private static UserRecord randomRecord(Random random) {
+ String source = SOURCES[random.nextInt(SOURCES.length)];
+ String state = STATES[random.nextInt(STATES.length)];
+ String email = randomEmail(random);
+ Instant createdAt = Instant.now().minus(random.nextLong(0, 1000),
ChronoUnit.DAYS);
+
+ String userId = "user_" + (random.nextInt(99) + 1);
+ int userType = random.nextInt(4) + 1;
+ String message = randomString(random, random.nextInt(91) + 10);
+
+ return new UserRecord(userId, userType, email, source, state,
createdAt, message);
+ }
+
+ private static String randomEmail(Random random) {
+ int nameLength = random.nextInt(17) + 3;
+ String name = randomString(random, nameLength);
+ String domain = DOMAINS[random.nextInt(DOMAINS.length)];
+ return name + "@" + domain;
+ }
+
+ private static String randomString(Random random, int size) {
+ StringBuilder builder = new StringBuilder(size);
+ for (int i = 0; i < size; i++) {
+ int choice = random.nextInt(36);
+ if (choice < 10) {
+ builder.append((char) ('0' + choice));
+ } else {
+ char base = random.nextBoolean() ? 'A' : 'a';
+ builder.append((char) (base + (choice - 10)));
+ }
+ }
+ return builder.toString();
+ }
+
+ private static HostAndPort parseAddress(String address) {
+ String normalized =
address.toLowerCase(Locale.ROOT).replace("iggy://", "");
+ String[] parts = normalized.split(":", 2);
+ String host = parts.length > 0 && !parts[0].isBlank() ? parts[0] :
"localhost";
+ int port = 8090;
+ if (parts.length == 2) {
+ try {
+ port = Integer.parseInt(parts[1]);
+ } catch (NumberFormatException ignored) {
+ log.warn("Invalid port in IGGY_ADDRESS, defaulting to 8090");
+ }
+ }
+ return new HostAndPort(host, port);
+ }
+
+ private record HostAndPort(String host, int port) {}
+
+ private record UserRecord(
+ String userId, int userType, String email, String source, String
state, Instant createdAt, String message) {
+ String toJson(ObjectMapper mapper) {
+ return mapper.writeValueAsString(this);
+ }
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/iggy/examples/streambuilder/StreamBasic.java
b/examples/java/src/main/java/org/apache/iggy/examples/streambuilder/StreamBasic.java
new file mode 100644
index 000000000..e6d9c73a4
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/iggy/examples/streambuilder/StreamBasic.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.examples.streambuilder;
+
+import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.Partitioning;
+import org.apache.iggy.message.PolledMessages;
+import org.apache.iggy.message.PollingStrategy;
+import org.apache.iggy.stream.StreamDetails;
+import org.apache.iggy.topic.CompressionAlgorithm;
+import org.apache.iggy.topic.TopicDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+
+public final class StreamBasic {
+
+ private static final Logger log =
LoggerFactory.getLogger(StreamBasic.class);
+
+ private static final String STREAM_NAME = "test_stream";
+ private static final StreamId STREAM_ID = StreamId.of(STREAM_NAME);
+
+ private static final String TOPIC_NAME = "test_topic";
+ private static final TopicId TOPIC_ID = TopicId.of(TOPIC_NAME);
+
+ private static final long PARTITION_ID = 0L;
+ private static final long POLL_BATCH_SIZE = 50L;
+ private static final int EXPECTED_MESSAGES = 3;
+
+ private StreamBasic() {}
+
+ public static void main(String[] args) {
+ log.info("Build iggy client and connect it.");
+ var client = IggyTcpClient.builder()
+ .host("localhost")
+ .port(8090)
+ .credentials("iggy", "iggy")
+ .build();
+
+ try {
+ ensureStreamAndTopic(client);
+
+ log.info("Build iggy producer & consumer");
+ log.info("Send 3 test messages...");
+ sendMessage(client, "Hello World");
+ sendMessage(client, "Hola Iggy");
+ sendMessage(client, "Hi Apache");
+
+ log.info("Consume the messages");
+ consumeMessages(client);
+
+ log.info("Stop the message stream and shutdown iggy client");
+ } finally {
+ deleteStreamIfExists(client);
+ }
+ }
+
+ private static void consumeMessages(IggyTcpClient client) {
+ BigInteger offset = BigInteger.ZERO;
+ Consumer consumer = Consumer.of(0L);
+ int consumedMessages = 0;
+
+ while (consumedMessages < EXPECTED_MESSAGES) {
+ PolledMessages polledMessages = client.messages()
+ .pollMessages(
+ STREAM_ID,
+ TOPIC_ID,
+ Optional.of(PARTITION_ID),
+ consumer,
+ PollingStrategy.offset(offset),
+ POLL_BATCH_SIZE,
+ false);
+
+ for (Message message : polledMessages.messages()) {
+ String payload = new String(message.payload(),
StandardCharsets.UTF_8);
+ log.info(
+ "Message received: {} at offset: {} in partition ID:
{}",
+ payload,
+ message.header().offset(),
+ polledMessages.partitionId());
+
+ consumedMessages++;
+ if (consumedMessages == EXPECTED_MESSAGES) {
+ break;
+ }
+ }
+
+ offset =
offset.add(BigInteger.valueOf(polledMessages.messages().size()));
+ }
+ }
+
+ private static void ensureStreamAndTopic(IggyTcpClient client) {
+ Optional<StreamDetails> stream = client.streams().getStream(STREAM_ID);
+ if (stream.isPresent()) {
+ log.info("Stream {} already exists.", STREAM_NAME);
+ } else {
+ client.streams().createStream(STREAM_NAME);
+ log.info("Stream {} was created.", STREAM_NAME);
+ }
+
+ Optional<TopicDetails> topic = client.topics().getTopic(STREAM_ID,
TOPIC_ID);
+ if (topic.isPresent()) {
+ log.info("Topic {} already exists.", TOPIC_NAME);
+ } else {
+ client.topics()
+ .createTopic(
+ STREAM_ID,
+ 1L,
+ CompressionAlgorithm.None,
+ BigInteger.ZERO,
+ BigInteger.ZERO,
+ Optional.empty(),
+ TOPIC_NAME);
+ log.info("Topic {} was created.", TOPIC_NAME);
+ }
+ }
+
+ private static void sendMessage(IggyTcpClient client, String payload) {
+ client.messages()
+ .sendMessages(
+ STREAM_ID, TOPIC_ID,
Partitioning.partitionId(PARTITION_ID), List.of(Message.of(payload)));
+ }
+
+ private static void deleteStreamIfExists(IggyTcpClient client) {
+ Optional<StreamDetails> stream = client.streams().getStream(STREAM_ID);
+ if (stream.isEmpty()) {
+ log.info("Stream {} already removed.", STREAM_NAME);
+ return;
+ }
+
+ try {
+ client.streams().deleteStream(STREAM_ID);
+ log.info("Stream {} deleted.", STREAM_NAME);
+ } catch (Exception e) {
+ log.warn("Failed to delete stream {}: {}", STREAM_NAME,
e.getMessage());
+ }
+ }
+}
diff --git a/foreign/java/examples/build.gradle.kts
b/foreign/java/examples/build.gradle.kts
deleted file mode 100644
index d92ebb06a..000000000
--- a/foreign/java/examples/build.gradle.kts
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-plugins {
- id("iggy.java-application-conventions")
-}
-
-application {
- mainClass.set("org.apache.iggy.consumer.SimpleConsumer")
-}
-
-dependencies {
- implementation(project(":iggy"))
- implementation(libs.slf4j.api)
- runtimeOnly(libs.logback.classic)
- runtimeOnly(libs.netty.dns.macos) { artifact { classifier = "osx-aarch_64"
} }
-}
-
-// Task for running async consumer example
-tasks.register<JavaExec>("runAsyncConsumer") {
- group = "application"
- description = "Run the Async Consumer example with Netty"
- classpath = sourceSets["main"].runtimeClasspath
- mainClass.set("org.apache.iggy.async.AsyncConsumerExample")
-}
-
-// Task for running simple consumer
-tasks.register<JavaExec>("runSimpleConsumer") {
- group = "application"
- description = "Run the Simple Consumer example"
- classpath = sourceSets["main"].runtimeClasspath
- mainClass.set("org.apache.iggy.consumer.SimpleConsumer")
-}
-
-// Task for running simple producer
-tasks.register<JavaExec>("runSimpleProducer") {
- group = "application"
- description = "Run the Simple Producer example"
- classpath = sourceSets["main"].runtimeClasspath
- mainClass.set("org.apache.iggy.producer.SimpleProducer")
-}
-
-// Task for running simple async test
-tasks.register<JavaExec>("runSimpleAsyncTest") {
- group = "application"
- description = "Run the Simple Async Test for debugging"
- classpath = sourceSets["main"].runtimeClasspath
- mainClass.set("org.apache.iggy.async.SimpleAsyncTest")
-}
-
-// Task for running async producer
-tasks.register<JavaExec>("runAsyncProducer") {
- group = "application"
- description = "Run the Async Producer example"
- classpath = sourceSets["main"].runtimeClasspath
- mainClass.set("org.apache.iggy.async.AsyncProducer")
-}
diff --git
a/foreign/java/examples/src/main/java/org/apache/iggy/consumer/SimpleConsumer.java
b/foreign/java/examples/src/main/java/org/apache/iggy/consumer/SimpleConsumer.java
deleted file mode 100644
index 92fd51a4e..000000000
---
a/foreign/java/examples/src/main/java/org/apache/iggy/consumer/SimpleConsumer.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iggy.consumer;
-
-import org.apache.iggy.client.blocking.IggyBaseClient;
-import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
-import org.apache.iggy.consumergroup.Consumer;
-import org.apache.iggy.consumergroup.ConsumerGroupDetails;
-import org.apache.iggy.identifier.ConsumerId;
-import org.apache.iggy.identifier.StreamId;
-import org.apache.iggy.identifier.TopicId;
-import org.apache.iggy.message.Message;
-import org.apache.iggy.message.PollingStrategy;
-import org.apache.iggy.stream.StreamDetails;
-import org.apache.iggy.topic.CompressionAlgorithm;
-import org.apache.iggy.topic.TopicDetails;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Optional;
-
-import static java.util.Optional.empty;
-
-public final class SimpleConsumer {
-
- private static final String STREAM_NAME = "dev01";
- private static final StreamId STREAM_ID = StreamId.of(STREAM_NAME);
- private static final String TOPIC_NAME = "events";
- private static final TopicId TOPIC_ID = TopicId.of(TOPIC_NAME);
- private static final String GROUP_NAME = "simple-consumer";
- private static final ConsumerId GROUP_ID = ConsumerId.of(GROUP_NAME);
- private static final Logger log =
LoggerFactory.getLogger(SimpleConsumer.class);
-
- private SimpleConsumer() {}
-
- public static void main(String[] args) {
- var client = new IggyTcpClient("localhost", 8090);
- client.users().login("iggy", "iggy");
-
- createStream(client);
- createTopic(client);
- createConsumerGroup(client);
- client.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID,
GROUP_ID);
-
- var messages = new ArrayList<Message>();
- while (messages.size() < 1000) {
- var polledMessages = client.messages()
- .pollMessages(
- STREAM_ID, TOPIC_ID, empty(),
Consumer.group(GROUP_ID), PollingStrategy.next(), 10L, true);
- messages.addAll(polledMessages.messages());
- log.debug(
- "Fetched {} messages from partition {}, current offset {}",
- polledMessages.messages().size(),
- polledMessages.partitionId(),
- polledMessages.currentOffset());
- }
- }
-
- private static void createStream(IggyBaseClient client) {
- Optional<StreamDetails> stream = client.streams().getStream(STREAM_ID);
- if (stream.isPresent()) {
- return;
- }
- client.streams().createStream(STREAM_NAME);
- }
-
- private static void createTopic(IggyBaseClient client) {
- Optional<TopicDetails> topic = client.topics().getTopic(STREAM_ID,
TOPIC_ID);
- if (topic.isPresent()) {
- return;
- }
- client.topics()
- .createTopic(
- STREAM_ID,
- 1L,
- CompressionAlgorithm.None,
- BigInteger.ZERO,
- BigInteger.ZERO,
- empty(),
- TOPIC_NAME);
- }
-
- private static void createConsumerGroup(IggyBaseClient client) {
- Optional<ConsumerGroupDetails> consumerGroup =
- client.consumerGroups().getConsumerGroup(STREAM_ID, TOPIC_ID,
GROUP_ID);
- if (consumerGroup.isPresent()) {
- return;
- }
- client.consumerGroups().createConsumerGroup(STREAM_ID, TOPIC_ID,
GROUP_NAME);
- }
-}
diff --git a/foreign/java/settings.gradle.kts b/foreign/java/settings.gradle.kts
index 0af0da426..050119ed8 100644
--- a/foreign/java/settings.gradle.kts
+++ b/foreign/java/settings.gradle.kts
@@ -22,9 +22,6 @@ rootProject.name = "iggy-java-sdk"
include("iggy")
project(":iggy").projectDir = file("java-sdk")
-include("iggy-example")
-project(":iggy-example").projectDir = file("examples")
-
// External processors - Stream processing integrations
include("iggy-connector-library")
project(":iggy-connector-library").projectDir =
file("external-processors/iggy-connector-flink/iggy-connector-library")
diff --git a/scripts/run-java-examples-from-readme.sh
b/scripts/run-java-examples-from-readme.sh
new file mode 100755
index 000000000..22a462f63
--- /dev/null
+++ b/scripts/run-java-examples-from-readme.sh
@@ -0,0 +1,142 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+
+# Script to run Java examples from examples/java/README.md
+# Usage: ./scripts/run-java-examples-from-readme.sh [TARGET]
+#
+# TARGET - Optional target architecture (e.g., x86_64-unknown-linux-musl)
+# If not provided, uses the default target
+#
+# This script scans examples/java/README.md for commands starting with
+# `./gradlew` and executes them in order. If any command fails, the script
+# stops immediately and prints the relevant iggy-server logs.
+
+readonly LOG_FILE="iggy-server.log"
+readonly PID_FILE="iggy-server.pid"
+readonly TIMEOUT=300
+
+ROOT_WORKDIR="$(pwd)"
+TARGET="${1:-}"
+
+if [ -n "${TARGET}" ]; then
+ echo "Using target architecture: ${TARGET}"
+else
+ echo "Using default target architecture"
+fi
+
+# Remove old server data if present
+test -d local_data && rm -fr local_data
+test -e ${LOG_FILE} && rm ${LOG_FILE}
+test -e ${PID_FILE} && rm ${PID_FILE}
+
+# Check if server binary exists
+SERVER_BIN=""
+if [ -n "${TARGET}" ]; then
+ SERVER_BIN="target/${TARGET}/debug/iggy-server"
+else
+ SERVER_BIN="target/debug/iggy-server"
+fi
+
+if [ ! -f "${SERVER_BIN}" ]; then
+ echo "Error: Server binary not found at ${SERVER_BIN}"
+ echo "Please build the server binary before running this script:"
+ if [ -n "${TARGET}" ]; then
+ echo " cargo build --target ${TARGET} --bin iggy-server"
+ else
+ echo " cargo build --bin iggy-server"
+ fi
+ exit 1
+fi
+
+echo "Using server binary at ${SERVER_BIN}"
+
+# Run iggy server using the prebuilt binary
+echo "Starting server from ${SERVER_BIN}..."
+IGGY_ROOT_USERNAME=iggy IGGY_ROOT_PASSWORD=iggy ${SERVER_BIN} &>${LOG_FILE} &
+echo $! >${PID_FILE}
+
+# Wait until "Iggy server has started" string is present inside iggy-server.log
+SERVER_START_TIME=0
+while ! grep -q "has started" ${LOG_FILE}; do
+ if [ ${SERVER_START_TIME} -gt ${TIMEOUT} ]; then
+ echo "Server did not start within ${TIMEOUT} seconds."
+ ps fx
+ cat ${LOG_FILE}
+ exit 1
+ fi
+ echo "Waiting for Iggy server to start... ${SERVER_START_TIME}"
+ sleep 1
+ ((SERVER_START_TIME += 1))
+done
+
+cd examples/java
+
+exit_code=0
+README_FILE="README.md"
+
+if [ -f "${README_FILE}" ]; then
+ while IFS= read -r command; do
+ # Remove backticks and comments from command
+ command=$(echo "${command}" | tr -d '`' | sed 's/^#.*//')
+ # Skip empty lines
+ if [ -z "${command}" ]; then
+ continue
+ fi
+
+ echo -e "\e[33mChecking example command from ${README_FILE}:\e[0m
${command}"
+ echo ""
+
+ set +e
+ eval "${command}"
+ exit_code=$?
+ set -e
+
+ if [ ${exit_code} -ne 0 ]; then
+ echo ""
+ echo -e "\e[31mExample command failed:\e[0m ${command}"
+ echo ""
+ break
+ fi
+
+ # Small delay between runs to avoid thrashing the server
+ sleep 2
+ done < <(grep -E '^gradle' "${README_FILE}")
+else
+ echo "README file ${README_FILE} not found in examples/java."
+fi
+
+cd "${ROOT_WORKDIR}"
+
+# Terminate server
+kill -TERM "$(cat ${PID_FILE})"
+test -e ${PID_FILE} && rm ${PID_FILE}
+
+if [ "${exit_code}" -eq 0 ]; then
+ echo "Test passed"
+else
+ echo "Test failed, see log file:"
+ test -e ${LOG_FILE} && cat ${LOG_FILE}
+fi
+
+test -e ${LOG_FILE} && rm ${LOG_FILE}
+test -e ${PID_FILE} && rm ${PID_FILE}
+
+exit "${exit_code}"