mmodzelewski commented on code in PR #2504: URL: https://github.com/apache/iggy/pull/2504#discussion_r2637415665
########## examples/java/gradle/wrapper/gradle-wrapper.jar: ########## Review Comment: Please remove the wrapper jar and all gradle wrapper related files. For now, we will stick to locally installed gradle versions, as having compiled binary within release package violates ASF rules. ########## examples/java/build.gradle.kts: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* Review Comment: please remove this auto-inserted comment ########## examples/java/src/main/java/org/apache/iggy/examples/multitenant/producer/MultiTenantProducer.java: ########## @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.examples.multitenant.producer; + +import org.apache.iggy.client.blocking.tcp.IggyTcpClient; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.message.Message; +import org.apache.iggy.message.Partitioning; +import org.apache.iggy.stream.StreamDetails; +import org.apache.iggy.topic.CompressionAlgorithm; +import org.apache.iggy.topic.TopicDetails; +import org.apache.iggy.user.GlobalPermissions; +import org.apache.iggy.user.Permissions; +import org.apache.iggy.user.StreamPermissions; +import org.apache.iggy.user.UserInfo; +import org.apache.iggy.user.UserInfoDetails; +import org.apache.iggy.user.UserStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public final class MultiTenantProducer { + + private static final Logger log = LoggerFactory.getLogger(MultiTenantProducer.class); + + private static final String[] TOPICS = {"events", "logs", "notifications"}; + private static final String PASSWORD = "secret"; + + private MultiTenantProducer() {} + + public static void main(String[] args) { + int tenantsCount = 3; + int producersCount = 3; + int partitionsCount = 3; + boolean ensureAccess = true; + long batchesLimit = 10L; + int messagesPerBatch = 1; + long intervalMs = 1L; + + String address = "127.0.0.1:8090"; + String rootUsername = "iggy"; + String rootPassword = "iggy"; + + log.info( + "Multi-tenant producer has started, tenants: {}, producers: {}, partitions: {}", + tenantsCount, + producersCount, + partitionsCount); + + HostAndPort hostAndPort = parseAddress(address); + IggyTcpClient rootClient = IggyTcpClient.builder() + .host(hostAndPort.host()) + .port(hostAndPort.port()) + .credentials(rootUsername, rootPassword) + .build(); + + log.info("Creating streams and users with permissions for each tenant"); + Map<String, String> streamsWithUsers = new HashMap<>(); + for (int i = 1; i <= tenantsCount; i++) { + String tenantPrefix = "tenant_" + i; + String stream = tenantPrefix + "_stream"; + String user = tenantPrefix + "_producer"; + createStreamAndUser(rootClient, stream, user); + streamsWithUsers.put(stream, user); + } + + log.info("Creating clients for each tenant"); + List<Tenant> tenants = new ArrayList<>(); + int tenantId = 1; + for (Map.Entry<String, String> entry : streamsWithUsers.entrySet()) { + IggyTcpClient client = IggyTcpClient.builder() + .host(hostAndPort.host()) + .port(hostAndPort.port()) + .credentials(entry.getValue(), PASSWORD) + .build(); + tenants.add(Tenant.newTenant(tenantId, entry.getKey(), entry.getValue(), client)); + tenantId++; + } + + if (ensureAccess) { + log.info("Ensuring access to streams for each tenant"); + for (Tenant tenant : tenants) { + List<String> unavailable = new ArrayList<>(); + for (Tenant other : tenants) { + if (!other.stream().equals(tenant.stream())) { + unavailable.add(other.stream()); + } + } + ensureStreamAccess(tenant.client(), tenant.stream(), unavailable); + } + } + + log.info("Creating {} producer(s) for each tenant", producersCount); + for (Tenant tenant : tenants) { + List<TenantProducer> producers = + createProducers(tenant.client(), producersCount, partitionsCount, tenant.stream(), TOPICS); + tenant.addProducers(producers); + log.info( + "Created {} producer(s) for tenant stream: {}, username: {}", + producersCount, + tenant.stream(), + tenant.user()); + } + + log.info("Starting {} producer(s) for each tenant", producersCount); + ExecutorService executor = Executors.newCachedThreadPool(); + List<Future<?>> tasks = new ArrayList<>(); + for (Tenant tenant : tenants) { + for (TenantProducer producer : tenant.producers()) { + tasks.add(executor.submit(() -> + sendBatches(tenant.id(), producer, batchesLimit, messagesPerBatch, intervalMs, TOPICS.length))); + } + } + + waitFor(tasks); + executor.shutdown(); + log.info("Disconnecting clients"); + } + + private static void sendBatches( + int tenantId, + TenantProducer producer, + long batchesCount, + int batchLength, + long intervalMs, + int topicsCount) { + long counter = 1; + long eventsId = 1; + long logsId = 1; + long notificationsId = 1; + + while (counter <= (long) topicsCount * batchesCount) { + long messageId; + String messagePrefix; + switch (producer.topic()) { + case "events" -> { + eventsId += 1; + messageId = eventsId; + messagePrefix = "event"; + } + case "logs" -> { + logsId += 1; + messageId = logsId; + messagePrefix = "log"; + } + case "notifications" -> { + notificationsId += 1; + messageId = notificationsId; + messagePrefix = "notification"; + } + default -> throw new IllegalStateException("Invalid topic"); + } + + List<Message> messages = new ArrayList<>(batchLength); + List<String> payloads = new ArrayList<>(batchLength); + for (int i = 0; i < batchLength; i++) { + String payload = messagePrefix + "-" + producer.id() + "-" + messageId; + messages.add(Message.of(payload)); + payloads.add(payload); Review Comment: looks like the payloads variable is never used after writing to it ########## examples/java/build.gradle.kts: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * This file was generated by the Gradle 'init' task. + * + * This is a general purpose Gradle build. + * Learn more about Gradle by exploring our Samples at https://docs.gradle.org/8.14.3/samples + */ + +plugins { + java + id("com.diffplug.spotless") version "8.1.0" + checkstyle +} + +dependencies { + implementation("org.apache.iggy:iggy:0.6.0") + implementation("org.slf4j:slf4j-simple:2.0.13") + implementation("com.fasterxml.jackson.core:jackson-databind:2.20.1") Review Comment: Jackson 3 is already available, please use it instead ########## examples/java/build.gradle.kts: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * This file was generated by the Gradle 'init' task. + * + * This is a general purpose Gradle build. + * Learn more about Gradle by exploring our Samples at https://docs.gradle.org/8.14.3/samples + */ + +plugins { + java + id("com.diffplug.spotless") version "8.1.0" + checkstyle +} + +dependencies { + implementation("org.apache.iggy:iggy:0.6.0") Review Comment: It'd be great if you changed this reference to the current version under `foreign/java`. This way we can keep the examples up to date and catch if they break for any reason. ########## examples/java/src/main/java/org/apache/iggy/examples/basic/consumer/BasicConsumer.java: ########## Review Comment: I think that it was discussed in comments that basic examples should be removed in favour of getting stated ones. ########## examples/java/gradle.properties: ########## @@ -0,0 +1,5 @@ +# This file was generated by the Gradle 'init' task. Review Comment: please remove the comment ########## examples/java/settings.gradle.kts: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * This file was generated by the Gradle 'init' task. Review Comment: please remove the comment ########## examples/java/gradle/libs.versions.toml: ########## Review Comment: as the file is not used in the examples and probably won't be needed as there's only one project, we can remove it completely ########## examples/java/README.md: ########## @@ -0,0 +1,140 @@ +# Iggy Java Examples + +This directory contains comprehensive sample applications that showcase various usage patterns of the Iggy java client SDK, from basic operations to advanced multi-tenant scenarios. + +## Running Examples + +Iggy requires valid credentials to authenticate client requests. The examples assume that the server is using the default root credentials, which can be enabled in one of two ways: + +1. Start the server with default credentials: + + ```bash + cargo run --bin iggy-server -- --with-default-root-credentials + ``` + +2. Set the appropriate environment variables before starting the server with `cargo run --bin iggy-server`: + + macOS/Linux: + + ```bash + export IGGY_ROOT_USERNAME=iggy + export IGGY_ROOT_PASSWORD=iggy + ``` + + Windows(Powershell): + + ```bash + $env:IGGY_ROOT_USERNAME = "iggy" + $env:IGGY_ROOT_PASSWORD = "iggy" + ``` + +> **Note** <br> +> This setup is intended only for development and testing, not production use. + +By default, all server data is stored in the `local_data` directory (this can be changed via `system.path` in `server.toml`). + +Root credentials are applied **only on the very first startup**, when no data directory exists yet. +Once the server has created and populated the data directory, the existing stored credentials will always be used, and supplying the `--with-default-root-credentials` flag or setting the environment variables will no longer override them. + +If the server has already been started once and your example returns `Error: InvalidCredentials`, then this means the stored credentials differ from the defaults. + +You can reset the credentials in one of two ways: + +1. Delete the existing data directory, then start the server again with the default-credential flag or environment variables. +2. Use the `--fresh` flag to force a reset: + + ```bash + cargo run --bin iggy-server -- --with-default-root-credentials --fresh + ``` + + This will ignore any existing data directory and re-initialize it with the default credentials. + +For server configuration options and help: + +```bash +cargo run --bin iggy-server -- --help +``` + +You can also customize the server using environment variables: + +```bash +## Example: Enable HTTP transport and set custom address +IGGY_HTTP_ENABLED=true IGGY_TCP_ADDRESS=0.0.0.0:8090 cargo run --bin iggy-server +``` + +## Basic Examples + +### Getting Started + +A good introduction for newcomers to Iggy: + +```bash +./gradlew runGettingStartedProducer Review Comment: once you remove the gradle wrapper, please remember to update the references in the docs to use `gradle` ########## examples/java/README.md: ########## Review Comment: It would be good to mention versions of Java and Gradle that are required to run the examples ########## examples/java/src/main/java/org/apache/iggy/examples/messageenvelope/consumer/MessageEnvelopeConsumer.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.examples.messageenvelope.consumer; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.iggy.client.blocking.tcp.IggyTcpClient; +import org.apache.iggy.consumergroup.Consumer; +import org.apache.iggy.examples.shared.Messages; +import org.apache.iggy.examples.shared.Messages.Envelope; +import org.apache.iggy.examples.shared.Messages.OrderConfirmed; +import org.apache.iggy.examples.shared.Messages.OrderCreated; +import org.apache.iggy.examples.shared.Messages.OrderRejected; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.message.Message; +import org.apache.iggy.message.PolledMessages; +import org.apache.iggy.message.PollingStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +public final class MessageEnvelopeConsumer { + private static final String STREAM_NAME = "envelope-stream"; + private static final StreamId STREAM_ID = StreamId.of(STREAM_NAME); + + private static final String TOPIC_NAME = "envelope-topic"; + private static final TopicId TOPIC_ID = TopicId.of(TOPIC_NAME); + + private static final long PARTITION_ID = 0L; + private static final int BATCHES_LIMIT = 10; + private static final long MESSAGES_PER_BATCH = 1L; + private static final long INTERVAL_MS = 1; + + private static final Logger log = LoggerFactory.getLogger(MessageEnvelopeConsumer.class); + + private static final ObjectMapper MAPPER = new ObjectMapper() + .registerModule(new JavaTimeModule()) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + private MessageEnvelopeConsumer() {} + + public static void main(final String[] args) { + var client = IggyTcpClient.builder() + .host("localhost") + .port(8090) + .credentials("iggy", "iggy") + .build(); + + consumeMessages(client); + } + + private static void consumeMessages(IggyTcpClient client) { + log.info( + "Messages will be consumed from stream: {}, topic: {}, partition: {} with interval {}ms.", + STREAM_ID, + TOPIC_ID, + PARTITION_ID, + INTERVAL_MS); + + BigInteger offset = BigInteger.ZERO; + int consumedBatches = 0; + + Consumer consumer = Consumer.of(0L); + + while (true) { + if (consumedBatches == BATCHES_LIMIT) { + log.info("Consumed {} batches of messages, exiting.", consumedBatches); + return; + } + + try { + PolledMessages polledMessages = client.messages() + .pollMessages( + STREAM_ID, + TOPIC_ID, + Optional.of(PARTITION_ID), + consumer, + PollingStrategy.offset(offset), + MESSAGES_PER_BATCH, + false); + + if (polledMessages.messages().isEmpty()) { + log.info("No messages found."); + Thread.sleep(INTERVAL_MS); + continue; + } + + for (Message message : polledMessages.messages()) { + handleMessage(message, offset); + } + + consumedBatches++; + + offset = offset.add(BigInteger.valueOf(polledMessages.messages().size())); + + Thread.sleep(INTERVAL_MS); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + log.error("Error polling messages", e); + break; + } + } + } + + private static void handleMessage(Message message, BigInteger offset) { + String json = new String(message.payload(), StandardCharsets.UTF_8); Review Comment: jackson can read directly from `bytes` so you don't need to create string first ########## examples/java/build.gradle.kts: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * This file was generated by the Gradle 'init' task. + * + * This is a general purpose Gradle build. + * Learn more about Gradle by exploring our Samples at https://docs.gradle.org/8.14.3/samples + */ + +plugins { + java + id("com.diffplug.spotless") version "8.1.0" + checkstyle +} + +dependencies { + implementation("org.apache.iggy:iggy:0.6.0") + implementation("org.slf4j:slf4j-simple:2.0.13") + implementation("com.fasterxml.jackson.core:jackson-databind:2.20.1") + implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.20.1") +} + +repositories { + mavenCentral() + maven { url = uri("https://repository.apache.org/content/repositories/snapshots/") } +} + +spotless { + java { + palantirJavaFormat() + removeUnusedImports() + trimTrailingWhitespace() + endWithNewline() + formatAnnotations() + importOrder("", "\n", "javax|java", "\n", "\\#") + toggleOffOn() + } +} + +checkstyle { + toolVersion = "12.2.0" +} + +tasks.withType<JavaCompile> { + dependsOn("spotlessApply") +} + +tasks.register<JavaExec>("runGettingStartedProducer") { + classpath = sourceSets["main"].runtimeClasspath + mainClass.set("org.apache.iggy.examples.gettingstarted.producer.GettingStartedProducer") +} + +tasks.register<JavaExec>("runGettingStartedConsumer") { + classpath = sourceSets["main"].runtimeClasspath + mainClass.set("org.apache.iggy.examples.gettingstarted.consumer.GettingStartedConsumer") +} + +tasks.register<JavaExec>("runMessageHeadersProducer") { Review Comment: seems like these 2 scenarios for message headers do not exist, so you can remove the tasks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
