This is an automated email from the ASF dual-hosted git repository. maciej pushed a commit to branch java-clients-unification in repository https://gitbox.apache.org/repos/asf/iggy.git
commit d43791f138b2ec088ee861da3e8bab843de0feee Author: Maciej Modzelewski <[email protected]> AuthorDate: Mon Mar 16 15:25:51 2026 +0100 refactor(java): unify blocking TCP client as wrapper over async client The blocking TCP client had its own independent protocol implementation via InternalTcpClient using Reactor Netty. This duplicated all serialization, request/response handling, and connection logic already present in the async client. Replace InternalTcpClient with thin delegation to AsyncIggyTcpClient, resolving CompletableFutures via FutureUtil.resolve(). This eliminates ~900 lines of duplicated protocol code and the Reactor dependency, replacing it with direct Netty modules. --- foreign/java/BUILD_AND_TEST.md | 293 --------------------- foreign/java/gradle/libs.versions.toml | 12 +- foreign/java/java-sdk/build.gradle.kts | 6 +- .../async/tcp/AsyncIggyTcpClientBuilder.java | 2 +- .../blocking/tcp/ConsumerGroupsTcpClient.java | 53 +--- ...cpClient.java => ConsumerOffsetsTcpClient.java} | 29 +- .../iggy/client/blocking/tcp/FutureUtil.java | 51 ++++ .../iggy/client/blocking/tcp/IggyTcpClient.java | 95 +++---- .../client/blocking/tcp/IggyTcpClientBuilder.java | 59 ++--- .../client/blocking/tcp/InternalTcpClient.java | 172 ------------ .../client/blocking/tcp/MessagesTcpClient.java | 61 +---- .../client/blocking/tcp/PartitionsTcpClient.java | 21 +- .../tcp/PersonalAccessTokensTcpClient.java | 33 +-- .../iggy/client/blocking/tcp/StreamsTcpClient.java | 42 +-- .../iggy/client/blocking/tcp/SystemTcpClient.java | 27 +- .../iggy/client/blocking/tcp/TopicsTcpClient.java | 52 +--- .../iggy/client/blocking/tcp/UsersTcpClient.java | 94 ++----- .../iggy/client/blocking/tcp/package-info.java | 9 + 18 files changed, 207 insertions(+), 904 deletions(-) diff --git a/foreign/java/BUILD_AND_TEST.md b/foreign/java/BUILD_AND_TEST.md deleted file mode 100644 index 14c7e4cff..000000000 --- a/foreign/java/BUILD_AND_TEST.md +++ /dev/null @@ -1,293 +0,0 @@ -# Apache Iggy Java SDK - Build and Test Guide - -## Overview - -This document provides comprehensive instructions for building and testing the -Apache Iggy Java SDK, including the new async client implementation with fixed -polling functionality. - -## Prerequisites - -### Required Software - -- **Java**: JDK 17 or higher -- **Gradle**: Version 9.3+ (included via wrapper) -- **Iggy Server**: Running instance on localhost:8090 -- **Git**: For version control - -### Starting the Iggy Server - -```bash -# Navigate to Iggy server directory -cd /path/to/iggy - -READ the README.md and start iggy-server as prescribed there. - -## Building the Project - -### 1. Clean Build - -```bash -cd iggy/foreign/java/java-sdk - -# Clean previous builds -./gradlew clean - -# Build without running tests -./gradlew build -x test - -# Or build with tests (skip checkstyle) -./gradlew build -x checkstyleMain -x checkstyleTest -``` - -### 2. Compile Only - -```bash -# Compile main source code -./gradlew compileJava - -# Compile test code -./gradlew compileTestJava -``` - -## Running Tests - -### 1. Run All Tests - -```bash -# Run all tests (skip checkstyle) -./gradlew test -x checkstyleMain -x checkstyleTest -``` - -### 2. Run Specific Test Class - -```bash -# Run AsyncPollMessageTest specifically -./gradlew test --tests AsyncPollMessageTest -x checkstyleMain -x checkstyleTest - -# Run with detailed output -./gradlew test --tests AsyncPollMessageTest --info -x checkstyleMain -x checkstyleTest -``` - -### 3. Run Test Suite by Package - -```bash -# Run all async client tests -./gradlew test --tests "org.apache.iggy.client.async.*" -x checkstyleMain -x checkstyleTest -``` - -### 4. View Test Results - -Test reports are generated at: - -```bash -java-sdk/build/reports/tests/test/index.html -``` - -Open this file in a browser to view detailed test results. - -## Key Test: AsyncPollMessageTest - -This test validates the async client's polling functionality and demonstrates important behaviors: - -### Test Coverage - -1. **Null Consumer Handling** - Confirms server timeout (3 seconds) when polling with null consumer -2. **Consumer ID Validation** - Tests various consumer IDs (0, 1, 99999) -3. **Polling Strategies** - Validates FIRST, OFFSET, and LAST strategies -4. **Connection Recovery** - Handles reconnection after timeouts -5. **Message Retrieval** - Verifies successful message polling - -### Running the AsyncPollMessageTest - -```bash -# Run with full output -cd iggy/foreign/java/java-sdk -./gradlew test --tests AsyncPollMessageTest -x checkstyleMain -x checkstyleTest --info -``` - -Expected output: - -- All 5 tests should pass (100% success rate) -- Test 1 will take ~3 seconds due to null consumer timeout -- Other tests complete quickly (<1 second each) - -## Key Components - -### Async Client Classes - -1. **AsyncIggyTcpClient** (`client/async/tcp/AsyncIggyTcpClient.java`) - - Main entry point for async operations - - Manages TCP connection via Netty - - Provides access to subsystem clients - -2. **AsyncMessagesTcpClient** (`client/async/tcp/AsyncMessagesTcpClient.java`) - - Handles message operations (send/poll) - - Fixed partition ID encoding (4-byte little-endian) - - Proper null consumer handling - -3. **AsyncTopicsTcpClient** (`client/async/tcp/AsyncTopicsTcpClient.java`) - - Topic management operations - - Create, update, delete topics - - Retrieve topic information - -4. **AsyncStreamsTcpClient** (`client/async/tcp/AsyncStreamsTcpClient.java`) - - Stream management operations - - Create, update, delete streams - -## Important Fixes Applied - -### 1. Partition ID Encoding Fix - -**File**: `AsyncMessagesTcpClient.java` - -```java -// Correct: 4-byte little-endian encoding -payload.writeIntLE(partitionId.orElse(0L).intValue()); -``` - -### 2. Null Consumer Serialization - -**File**: `AsyncBytesSerializer.java` - -```java -if (consumer == null) { - buffer.writeByte(0); // Consumer type: 0 for null - buffer.writeIntLE(0); // Empty identifier (4 bytes) -} -``` - -### 3. Connection Recovery - -**File**: `AsyncPollMessageTest.java` - -- Implements `@BeforeEach` to ensure connection before each test -- Handles reconnection after timeout failures - -## Troubleshooting - -### Common Issues - -1. **Connection Refused** - - Ensure Iggy server is running on `127.0.0.1:8090` - - Check server logs for errors - -2. **Timeout Errors** - - Expected for null consumer tests (3-second timeout) - - For other tests, check network connectivity - -3. **Build Failures** - - Run `./gradlew clean` before building - - Ensure Java 17+ is installed - - Check `JAVA_HOME` environment variable - -4. **Checkstyle Violations** - - Use `-x checkstyleMain -x checkstyleTest` to skip style checks - - Or fix violations shown in `build/reports/checkstyle/` - -### Debug Output - -To enable debug output in tests: - -```java -// Debug output is already included in AsyncMessagesTcpClient -// Look for lines starting with "DEBUG:" in test output -``` - -## Test Metrics - -### Expected Success Rates - -- **AsyncPollMessageTest**: 100% (5/5 tests) -- **Overall Test Suite**: ~94% (70/74 tests) - - Some AsyncClientIntegrationTest tests may fail due to connection handling - -### Performance Benchmarks - -- Message sending: <10ms per batch -- Message polling: <5ms (except null consumer: 3000ms timeout) -- Connection establishment: ~250ms - -## Development Workflow - -### 1. Make Changes - -```bash -# Edit source files -vim src/main/java/org/apache/iggy/client/async/... -``` - -### 2. Compile - -```bash -./gradlew compileJava -``` - -### 3. Test - -```bash -# Run specific test -./gradlew test --tests AsyncPollMessageTest -x checkstyleMain -x checkstyleTest - -# Or run all async tests -./gradlew test --tests "org.apache.iggy.client.async.*" -x checkstyleMain -x checkstyleTest -``` - -### 4. View Results - -```bash -# Open test report in browser -open build/reports/tests/test/index.html -``` - -## Integration with IDE - -### IntelliJ IDEA - -1. Import as Gradle project -2. Set Java SDK to 17+ -3. Run tests directly from IDE - -### VS Code - -1. Install Java Extension Pack -2. Open folder containing `build.gradle` -3. Use Test Explorer for running tests - -## Contributing - -When contributing changes: - -1. **Always compile after changes** - - ```bash - ./gradlew compileJava compileTestJava - ``` - -2. **Run relevant tests** - - ```bash - ./gradlew test --tests "*Test" -x checkstyleMain -x checkstyleTest - ``` - -3. **Check for warnings** - - ```bash - ./gradlew build 2>&1 | grep -i warning - ``` - -4. **Clean up test files** - - Remove temporary test classes - - Delete debug output before committing - -## Summary - -The Apache Iggy Java SDK async client is fully functional with: - -- ✅ Fixed partition ID encoding -- ✅ Proper null consumer handling -- ✅ Connection recovery mechanisms -- ✅ Comprehensive test coverage -- ✅ All polling strategies working - -For questions or issues, refer to the test output and debug messages for detailed diagnostics. diff --git a/foreign/java/gradle/libs.versions.toml b/foreign/java/gradle/libs.versions.toml index 108b5909f..9222be7a9 100644 --- a/foreign/java/gradle/libs.versions.toml +++ b/foreign/java/gradle/libs.versions.toml @@ -32,10 +32,6 @@ commons-lang3 = "3.20.0" # HTTP Client httpclient5 = "5.5.1" -# Reactor -reactor-core = "3.8.0" -reactor-netty = "1.3.0" - # Logging slf4j = "2.0.17" logback = "1.5.21" @@ -74,10 +70,6 @@ httpclient5 = { module = "org.apache.httpcomponents.client5:httpclient5", versio # Apache Commons commons-lang3 = { module = "org.apache.commons:commons-lang3", version.ref = "commons-lang3" } -# Reactor -reactor-core = { module = "io.projectreactor:reactor-core", version.ref = "reactor-core" } -reactor-netty-core = { module = "io.projectreactor.netty:reactor-netty-core", version.ref = "reactor-netty" } - # Logging slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } @@ -99,6 +91,10 @@ testcontainers = { module = "org.testcontainers:testcontainers", version.ref = " testcontainers-junit = { module = "org.testcontainers:testcontainers-junit-jupiter", version.ref = "testcontainers" } # Netty +netty-buffer = { module = "io.netty:netty-buffer", version.ref = "netty" } +netty-transport = { module = "io.netty:netty-transport", version.ref = "netty" } +netty-handler = { module = "io.netty:netty-handler", version.ref = "netty" } +netty-codec = { module = "io.netty:netty-codec", version.ref = "netty" } netty-dns-macos = { module = "io.netty:netty-resolver-dns-native-macos", version.ref = "netty" } # Spotbugs diff --git a/foreign/java/java-sdk/build.gradle.kts b/foreign/java/java-sdk/build.gradle.kts index defee3100..688680e68 100644 --- a/foreign/java/java-sdk/build.gradle.kts +++ b/foreign/java/java-sdk/build.gradle.kts @@ -45,8 +45,10 @@ dependencies { implementation(libs.commons.lang3) implementation(libs.slf4j.api) implementation(libs.spotbugs.annotations) - implementation(libs.reactor.core) - implementation(libs.reactor.netty.core) + implementation(libs.netty.buffer) + implementation(libs.netty.transport) + implementation(libs.netty.handler) + implementation(libs.netty.codec) testImplementation(libs.testcontainers) testImplementation(libs.testcontainers.junit) testImplementation(platform(libs.junit.bom)) diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java index 1db799094..88264103a 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClientBuilder.java @@ -75,7 +75,7 @@ public final class AsyncIggyTcpClientBuilder { private RetryPolicy retryPolicy; private Duration acquireTimeout; - AsyncIggyTcpClientBuilder() {} + public AsyncIggyTcpClientBuilder() {} /** * Sets the host address for the Iggy server. diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java index 7868d1dfb..b69416d1c 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java @@ -19,84 +19,51 @@ package org.apache.iggy.client.blocking.tcp; -import io.netty.buffer.Unpooled; import org.apache.iggy.client.blocking.ConsumerGroupsClient; import org.apache.iggy.consumergroup.ConsumerGroup; import org.apache.iggy.consumergroup.ConsumerGroupDetails; import org.apache.iggy.identifier.ConsumerId; import org.apache.iggy.identifier.StreamId; import org.apache.iggy.identifier.TopicId; -import org.apache.iggy.serde.BytesDeserializer; -import org.apache.iggy.serde.BytesSerializer; -import org.apache.iggy.serde.CommandCode; import java.util.List; import java.util.Optional; -import static org.apache.iggy.serde.BytesSerializer.toBytes; +final class ConsumerGroupsTcpClient implements ConsumerGroupsClient { -class ConsumerGroupsTcpClient implements ConsumerGroupsClient { + private final org.apache.iggy.client.async.ConsumerGroupsClient delegate; - private final InternalTcpClient tcpClient; - - public ConsumerGroupsTcpClient(InternalTcpClient tcpClient) { - this.tcpClient = tcpClient; + ConsumerGroupsTcpClient(org.apache.iggy.client.async.ConsumerGroupsClient delegate) { + this.delegate = delegate; } @Override public Optional<ConsumerGroupDetails> getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) { - var payload = toBytes(streamId); - payload.writeBytes(toBytes(topicId)); - payload.writeBytes(toBytes(groupId)); - return tcpClient.exchangeForOptional( - CommandCode.ConsumerGroup.GET, payload, BytesDeserializer::readConsumerGroupDetails); + return FutureUtil.resolve(delegate.getConsumerGroup(streamId, topicId, groupId)); } @Override public List<ConsumerGroup> getConsumerGroups(StreamId streamId, TopicId topicId) { - var payload = toBytes(streamId); - payload.writeBytes(toBytes(topicId)); - return tcpClient.exchangeForList( - CommandCode.ConsumerGroup.GET_ALL, payload, BytesDeserializer::readConsumerGroup); + return FutureUtil.resolve(delegate.getConsumerGroups(streamId, topicId)); } @Override public ConsumerGroupDetails createConsumerGroup(StreamId streamId, TopicId topicId, String name) { - var streamIdBytes = toBytes(streamId); - var topicIdBytes = toBytes(topicId); - var payload = Unpooled.buffer(1 + streamIdBytes.readableBytes() + topicIdBytes.readableBytes() + name.length()); - - payload.writeBytes(streamIdBytes); - payload.writeBytes(topicIdBytes); - payload.writeBytes(BytesSerializer.toBytes(name)); - - return tcpClient.exchangeForEntity( - CommandCode.ConsumerGroup.CREATE, payload, BytesDeserializer::readConsumerGroupDetails); + return FutureUtil.resolve(delegate.createConsumerGroup(streamId, topicId, name)); } @Override public void deleteConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) { - var payload = toBytes(streamId); - payload.writeBytes(toBytes(topicId)); - payload.writeBytes(toBytes(groupId)); - tcpClient.send(CommandCode.ConsumerGroup.DELETE, payload); + FutureUtil.resolve(delegate.deleteConsumerGroup(streamId, topicId, groupId)); } @Override public void joinConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) { - var payload = toBytes(streamId); - payload.writeBytes(toBytes(topicId)); - payload.writeBytes(toBytes(groupId)); - - tcpClient.send(CommandCode.ConsumerGroup.JOIN, payload); + FutureUtil.resolve(delegate.joinConsumerGroup(streamId, topicId, groupId)); } @Override public void leaveConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) { - var payload = toBytes(streamId); - payload.writeBytes(toBytes(topicId)); - payload.writeBytes(toBytes(groupId)); - - tcpClient.send(CommandCode.ConsumerGroup.LEAVE, payload); + FutureUtil.resolve(delegate.leaveConsumerGroup(streamId, topicId, groupId)); } } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetsTcpClient.java similarity index 58% rename from foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java rename to foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetsTcpClient.java index 08ddd5f3b..03409d15d 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetsTcpClient.java @@ -24,44 +24,27 @@ import org.apache.iggy.consumergroup.Consumer; import org.apache.iggy.consumeroffset.ConsumerOffsetInfo; import org.apache.iggy.identifier.StreamId; import org.apache.iggy.identifier.TopicId; -import org.apache.iggy.serde.BytesDeserializer; -import org.apache.iggy.serde.CommandCode; import java.math.BigInteger; import java.util.Optional; -import static org.apache.iggy.serde.BytesSerializer.toBytes; -import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64; +final class ConsumerOffsetsTcpClient implements ConsumerOffsetsClient { -class ConsumerOffsetTcpClient implements ConsumerOffsetsClient { + private final org.apache.iggy.client.async.ConsumerOffsetsClient delegate; - private final InternalTcpClient tcpClient; - - public ConsumerOffsetTcpClient(InternalTcpClient tcpClient) { - this.tcpClient = tcpClient; + ConsumerOffsetsTcpClient(org.apache.iggy.client.async.ConsumerOffsetsClient delegate) { + this.delegate = delegate; } @Override public void storeConsumerOffset( StreamId streamId, TopicId topicId, Optional<Long> partitionId, Consumer consumer, BigInteger offset) { - var payload = toBytes(consumer); - payload.writeBytes(toBytes(streamId)); - payload.writeBytes(toBytes(topicId)); - payload.writeBytes(toBytes(partitionId)); - payload.writeBytes(toBytesAsU64(offset)); - - tcpClient.send(CommandCode.ConsumerOffset.STORE, payload); + FutureUtil.resolve(delegate.storeConsumerOffset(streamId, topicId, partitionId, consumer, offset)); } @Override public Optional<ConsumerOffsetInfo> getConsumerOffset( StreamId streamId, TopicId topicId, Optional<Long> partitionId, Consumer consumer) { - var payload = toBytes(consumer); - payload.writeBytes(toBytes(streamId)); - payload.writeBytes(toBytes(topicId)); - payload.writeBytes(toBytes(partitionId)); - - return tcpClient.exchangeForOptional( - CommandCode.ConsumerOffset.GET, payload, BytesDeserializer::readConsumerOffsetInfo); + return FutureUtil.resolve(delegate.getConsumerOffset(streamId, topicId, partitionId, consumer)); } } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/FutureUtil.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/FutureUtil.java new file mode 100644 index 000000000..3dfa7e276 --- /dev/null +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/FutureUtil.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.client.blocking.tcp; + +import org.apache.iggy.exception.IggyClientException; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +final class FutureUtil { + + private FutureUtil() {} + + static <T> T resolve(CompletableFuture<T> future) { + try { + return future.join(); + } catch (CompletionException e) { + throw unwrap(e.getCause()); + } catch (CancellationException e) { + throw new IggyClientException("Operation was cancelled", e); + } + } + + private static RuntimeException unwrap(Throwable cause) { + if (cause instanceof RuntimeException re) { + return re; + } + if (cause instanceof Error err) { + throw err; + } + return new IggyClientException("Unexpected exception", cause); + } +} diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java index 1f150d328..e3b8c03d8 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClient.java @@ -19,6 +19,7 @@ package org.apache.iggy.client.blocking.tcp; +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; import org.apache.iggy.client.blocking.ConsumerGroupsClient; import org.apache.iggy.client.blocking.ConsumerOffsetsClient; import org.apache.iggy.client.blocking.IggyBaseClient; @@ -29,73 +30,48 @@ import org.apache.iggy.client.blocking.StreamsClient; import org.apache.iggy.client.blocking.SystemClient; import org.apache.iggy.client.blocking.TopicsClient; import org.apache.iggy.client.blocking.UsersClient; -import org.apache.iggy.config.RetryPolicy; import org.apache.iggy.exception.IggyMissingCredentialsException; import org.apache.iggy.exception.IggyNotConnectedException; import org.apache.iggy.user.IdentityInfo; -import java.io.File; -import java.time.Duration; -import java.util.Optional; - -public class IggyTcpClient implements IggyBaseClient { - - private final String host; - private final int port; - private final boolean enableTls; - private final Optional<File> tlsCertificate; - private final Optional<String> username; - private final Optional<String> password; - - private UsersTcpClient usersClient; - private StreamsTcpClient streamsClient; - private TopicsTcpClient topicsClient; - private PartitionsTcpClient partitionsClient; - private ConsumerGroupsTcpClient consumerGroupsClient; - private ConsumerOffsetTcpClient consumerOffsetsClient; - private MessagesTcpClient messagesClient; - private SystemTcpClient systemClient; - private PersonalAccessTokensTcpClient personalAccessTokensClient; +import java.io.Closeable; + +public class IggyTcpClient implements IggyBaseClient, Closeable { + + private final AsyncIggyTcpClient asyncClient; + + private UsersClient usersClient; + private StreamsClient streamsClient; + private TopicsClient topicsClient; + private PartitionsClient partitionsClient; + private ConsumerGroupsClient consumerGroupsClient; + private ConsumerOffsetsClient consumerOffsetsClient; + private MessagesClient messagesClient; + private SystemClient systemClient; + private PersonalAccessTokensClient personalAccessTokensClient; public IggyTcpClient(String host, Integer port) { - this(host, port, null, null, null, null, null, null, false, Optional.empty()); + this(new AsyncIggyTcpClient(host, port)); } - @SuppressWarnings("checkstyle:ParameterNumber") - IggyTcpClient( - String host, - Integer port, - String username, - String password, - Duration connectionTimeout, - Duration requestTimeout, - Integer connectionPoolSize, - RetryPolicy retryPolicy, - boolean enableTls, - Optional<File> tlsCertificate) { - this.host = host; - this.port = port; - this.username = Optional.ofNullable(username); - this.password = Optional.ofNullable(password); - this.enableTls = enableTls; - this.tlsCertificate = tlsCertificate; + IggyTcpClient(AsyncIggyTcpClient asyncClient) { + this.asyncClient = asyncClient; } /** * Connects to the Iggy server. */ public void connect() { - InternalTcpClient tcpClient = new InternalTcpClient(host, port, enableTls, tlsCertificate); - tcpClient.connect(); - usersClient = new UsersTcpClient(tcpClient); - streamsClient = new StreamsTcpClient(tcpClient); - topicsClient = new TopicsTcpClient(tcpClient); - partitionsClient = new PartitionsTcpClient(tcpClient); - consumerGroupsClient = new ConsumerGroupsTcpClient(tcpClient); - consumerOffsetsClient = new ConsumerOffsetTcpClient(tcpClient); - messagesClient = new MessagesTcpClient(tcpClient); - systemClient = new SystemTcpClient(tcpClient); - personalAccessTokensClient = new PersonalAccessTokensTcpClient(tcpClient); + FutureUtil.resolve(asyncClient.connect()); + usersClient = new UsersTcpClient(asyncClient.users()); + streamsClient = new StreamsTcpClient(asyncClient.streams()); + topicsClient = new TopicsTcpClient(asyncClient.topics()); + partitionsClient = new PartitionsTcpClient(asyncClient.partitions()); + consumerGroupsClient = new ConsumerGroupsTcpClient(asyncClient.consumerGroups()); + consumerOffsetsClient = new ConsumerOffsetsTcpClient(asyncClient.consumerOffsets()); + messagesClient = new MessagesTcpClient(asyncClient.messages()); + systemClient = new SystemTcpClient(asyncClient.system()); + personalAccessTokensClient = new PersonalAccessTokensTcpClient(asyncClient.personalAccessTokens()); } /** @@ -114,13 +90,12 @@ public class IggyTcpClient implements IggyBaseClient { * @throws IggyNotConnectedException if client is not connected */ public IdentityInfo login() { - if (usersClient == null) { - throw new IggyNotConnectedException(); - } - if (username.isEmpty() || password.isEmpty()) { - throw new IggyMissingCredentialsException(); - } - return usersClient.login(username.get(), password.get()); + return FutureUtil.resolve(asyncClient.login()); + } + + @Override + public void close() { + FutureUtil.resolve(asyncClient.close()); } @Override diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClientBuilder.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClientBuilder.java index c5b9c39ac..ab8df4b9b 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClientBuilder.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/IggyTcpClientBuilder.java @@ -19,14 +19,13 @@ package org.apache.iggy.client.blocking.tcp; -import org.apache.commons.lang3.StringUtils; +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClientBuilder; import org.apache.iggy.config.RetryPolicy; -import org.apache.iggy.exception.IggyInvalidArgumentException; import org.apache.iggy.exception.IggyMissingCredentialsException; import java.io.File; import java.time.Duration; -import java.util.Optional; /** * Builder for creating configured IggyTcpClient instances. @@ -60,16 +59,10 @@ import java.util.Optional; * @see IggyTcpClient#builder() */ public final class IggyTcpClientBuilder { - private String host = "localhost"; - private Integer port = 8090; + + private final AsyncIggyTcpClientBuilder asyncBuilder = new AsyncIggyTcpClientBuilder(); private String username; private String password; - private Duration connectionTimeout; - private Duration requestTimeout; - private Integer connectionPoolSize; - private RetryPolicy retryPolicy; - private boolean enableTls = false; - private File tlsCertificate; IggyTcpClientBuilder() {} @@ -80,7 +73,7 @@ public final class IggyTcpClientBuilder { * @return this builder */ public IggyTcpClientBuilder host(String host) { - this.host = host; + asyncBuilder.host(host); return this; } @@ -91,7 +84,7 @@ public final class IggyTcpClientBuilder { * @return this builder */ public IggyTcpClientBuilder port(Integer port) { - this.port = port; + asyncBuilder.port(port); return this; } @@ -106,6 +99,7 @@ public final class IggyTcpClientBuilder { public IggyTcpClientBuilder credentials(String username, String password) { this.username = username; this.password = password; + asyncBuilder.credentials(username, password); return this; } @@ -116,7 +110,7 @@ public final class IggyTcpClientBuilder { * @return this builder */ public IggyTcpClientBuilder connectionTimeout(Duration connectionTimeout) { - this.connectionTimeout = connectionTimeout; + asyncBuilder.connectionTimeout(connectionTimeout); return this; } @@ -127,7 +121,7 @@ public final class IggyTcpClientBuilder { * @return this builder */ public IggyTcpClientBuilder requestTimeout(Duration requestTimeout) { - this.requestTimeout = requestTimeout; + asyncBuilder.requestTimeout(requestTimeout); return this; } @@ -138,7 +132,7 @@ public final class IggyTcpClientBuilder { * @return this builder */ public IggyTcpClientBuilder connectionPoolSize(Integer connectionPoolSize) { - this.connectionPoolSize = connectionPoolSize; + asyncBuilder.connectionPoolSize(connectionPoolSize); return this; } @@ -149,7 +143,7 @@ public final class IggyTcpClientBuilder { * @return this builder */ public IggyTcpClientBuilder retryPolicy(RetryPolicy retryPolicy) { - this.retryPolicy = retryPolicy; + asyncBuilder.retryPolicy(retryPolicy); return this; } @@ -160,7 +154,7 @@ public final class IggyTcpClientBuilder { * @return this builder */ public IggyTcpClientBuilder tls(boolean enableTls) { - this.enableTls = enableTls; + asyncBuilder.tls(enableTls); return this; } @@ -170,7 +164,7 @@ public final class IggyTcpClientBuilder { * @return this builder */ public IggyTcpClientBuilder enableTls() { - this.enableTls = true; + asyncBuilder.enableTls(); return this; } @@ -181,7 +175,7 @@ public final class IggyTcpClientBuilder { * @return this builder */ public IggyTcpClientBuilder tlsCertificate(File tlsCertificate) { - this.tlsCertificate = tlsCertificate; + asyncBuilder.tlsCertificate(tlsCertificate); return this; } @@ -192,7 +186,7 @@ public final class IggyTcpClientBuilder { * @return this builder */ public IggyTcpClientBuilder tlsCertificate(String tlsCertificatePath) { - this.tlsCertificate = StringUtils.isBlank(tlsCertificatePath) ? null : new File(tlsCertificatePath); + asyncBuilder.tlsCertificate(tlsCertificatePath); return this; } @@ -201,26 +195,11 @@ public final class IggyTcpClientBuilder { * Note: You still need to call {@link IggyTcpClient#connect()} on the returned client. * * @return a new IggyTcpClient instance - * @throws IggyInvalidArgumentException if the host is null or empty, or if the port is not positive + * @throws org.apache.iggy.exception.IggyInvalidArgumentException if the host is null or empty, or if the port is not positive */ public IggyTcpClient build() { - if (host == null || host.isEmpty()) { - throw new IggyInvalidArgumentException("Host cannot be null or empty"); - } - if (port == null || port <= 0) { - throw new IggyInvalidArgumentException("Port must be a positive integer"); - } - return new IggyTcpClient( - host, - port, - username, - password, - connectionTimeout, - requestTimeout, - connectionPoolSize, - retryPolicy, - this.enableTls, - Optional.ofNullable(tlsCertificate)); + AsyncIggyTcpClient asyncClient = asyncBuilder.build(); + return new IggyTcpClient(asyncClient); } /** @@ -230,7 +209,7 @@ public final class IggyTcpClientBuilder { * * @return a new IggyTcpClient instance that is connected and logged in * @throws IggyMissingCredentialsException if no credentials were provided - * @throws IggyInvalidArgumentException if the host is null or empty, or if the port is not positive + * @throws org.apache.iggy.exception.IggyInvalidArgumentException if the host is null or empty, or if the port is not positive */ public IggyTcpClient buildAndLogin() { if (username == null || password == null) { diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java deleted file mode 100644 index 944309937..000000000 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iggy.client.blocking.tcp; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; -import org.apache.iggy.exception.IggyConnectionException; -import org.apache.iggy.exception.IggyEmptyResponseException; -import org.apache.iggy.exception.IggyServerException; -import org.apache.iggy.exception.IggyTlsException; -import org.apache.iggy.serde.CommandCode; -import reactor.core.publisher.Mono; -import reactor.netty.Connection; -import reactor.netty.tcp.TcpClient; - -import javax.net.ssl.SSLException; -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.Function; - -final class InternalTcpClient { - - private static final int REQUEST_INITIAL_BYTES_LENGTH = 4; - private static final int COMMAND_LENGTH = 4; - private static final int RESPONSE_INITIAL_BYTES_LENGTH = 8; - - private final TcpClient client; - private final BlockingQueue<IggyResponse> responses = new LinkedBlockingQueue<>(); - private Connection connection; - - InternalTcpClient(String host, Integer port) { - this(host, port, false, Optional.empty()); - } - - InternalTcpClient(String host, Integer port, boolean enableTls, Optional<File> tlsCertificate) { - TcpClient tcpClient = TcpClient.create() - .host(host) - .port(port) - .doOnConnected(conn -> conn.addHandlerLast(new IggyResponseDecoder())); - - if (enableTls) { - try { - SslContextBuilder builder = SslContextBuilder.forClient(); - tlsCertificate.ifPresent(builder::trustManager); - SslContext sslContext = builder.build(); - tcpClient = tcpClient.secure(sslSpec -> sslSpec.sslContext(sslContext)); - } catch (SSLException e) { - throw new IggyTlsException("Failed to configure TLS for TcpClient", e); - } - } - - client = tcpClient; - } - - void connect() { - this.connection = client.connectNow(); - this.connection.inbound().receiveObject().ofType(IggyResponse.class).subscribe(responses::add); - } - - <T> List<T> exchangeForList(CommandCode command, Function<ByteBuf, T> responseMapper) { - return exchangeForList(command, Unpooled.EMPTY_BUFFER, responseMapper); - } - - <T> List<T> exchangeForList(CommandCode command, ByteBuf payload, Function<ByteBuf, T> responseMapper) { - return exchange(command, payload, response -> { - List<T> list = new ArrayList<>(); - var buf = response.payload(); - while (buf.isReadable()) { - list.add(responseMapper.apply(buf)); - } - return list; - }); - } - - <T> Optional<T> exchangeForOptional(CommandCode command, ByteBuf payload, Function<ByteBuf, T> responseMapper) { - return exchange(command, payload, response -> { - ByteBuf buf = response.payload(); - if (!buf.isReadable()) { - return Optional.empty(); - } - return Optional.ofNullable(responseMapper.apply(buf)); - }); - } - - <T> T exchangeForEntity(CommandCode command, ByteBuf payload, Function<ByteBuf, T> responseMapper) { - return exchange(command, payload, response -> { - if (!response.payload().isReadable()) { - throw new IggyEmptyResponseException(command.toString()); - } - return responseMapper.apply(response.payload()); - }); - } - - void send(CommandCode command) { - send(command, Unpooled.EMPTY_BUFFER); - } - - void send(CommandCode command, ByteBuf payload) { - exchange(command, payload, response -> null); - } - - private <T> T exchange(CommandCode command, ByteBuf payload, Function<IggyResponse, T> responseMapper) { - var payloadSize = payload.readableBytes() + COMMAND_LENGTH; - var buffer = Unpooled.buffer(REQUEST_INITIAL_BYTES_LENGTH + payloadSize); - buffer.writeIntLE(payloadSize); - buffer.writeIntLE(command.getValue()); - buffer.writeBytes(payload); - - connection.outbound().send(Mono.just(buffer)).then().block(); - try { - IggyResponse response = responses.take(); - if (response.status() != 0) { - byte[] errorPayload = new byte[response.payload().readableBytes()]; - response.payload().readBytes(errorPayload); - response.payload().release(); - throw IggyServerException.fromTcpResponse(response.status(), errorPayload); - } - try { - return responseMapper.apply(response); - } finally { - response.payload().release(); - } - } catch (InterruptedException e) { - throw new IggyConnectionException("Interrupted while waiting for response", e); - } - } - - static class IggyResponseDecoder extends ByteToMessageDecoder { - @Override - protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) { - if (byteBuf.readableBytes() < RESPONSE_INITIAL_BYTES_LENGTH) { - return; - } - byteBuf.markReaderIndex(); - var status = byteBuf.readUnsignedIntLE(); - var responseLength = byteBuf.readUnsignedIntLE(); - if (byteBuf.readableBytes() < responseLength) { - byteBuf.resetReaderIndex(); - return; - } - var length = Long.valueOf(responseLength).intValue(); - list.add(new IggyResponse(status, length, byteBuf.readBytes(length))); - } - } - - record IggyResponse(long status, int length, ByteBuf payload) {} -} diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java index 84e251d26..2ec3ed2fe 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java @@ -19,7 +19,6 @@ package org.apache.iggy.client.blocking.tcp; -import io.netty.buffer.Unpooled; import org.apache.iggy.client.blocking.MessagesClient; import org.apache.iggy.consumergroup.Consumer; import org.apache.iggy.identifier.StreamId; @@ -28,20 +27,16 @@ import org.apache.iggy.message.Message; import org.apache.iggy.message.Partitioning; import org.apache.iggy.message.PolledMessages; import org.apache.iggy.message.PollingStrategy; -import org.apache.iggy.serde.BytesDeserializer; -import org.apache.iggy.serde.CommandCode; import java.util.List; import java.util.Optional; -import static org.apache.iggy.serde.BytesSerializer.toBytes; +final class MessagesTcpClient implements MessagesClient { -class MessagesTcpClient implements MessagesClient { + private final org.apache.iggy.client.async.MessagesClient delegate; - private final InternalTcpClient tcpClient; - - public MessagesTcpClient(InternalTcpClient tcpClient) { - this.tcpClient = tcpClient; + MessagesTcpClient(org.apache.iggy.client.async.MessagesClient delegate) { + this.delegate = delegate; } @Override @@ -53,54 +48,12 @@ class MessagesTcpClient implements MessagesClient { PollingStrategy strategy, Long count, boolean autoCommit) { - var payload = toBytes(consumer); - payload.writeBytes(toBytes(streamId)); - payload.writeBytes(toBytes(topicId)); - payload.writeBytes(toBytes(partitionId)); - payload.writeBytes(toBytes(strategy)); - payload.writeIntLE(count.intValue()); - payload.writeByte(autoCommit ? 1 : 0); - - return tcpClient.exchangeForEntity(CommandCode.Messages.POLL, payload, BytesDeserializer::readPolledMessages); + return FutureUtil.resolve( + delegate.pollMessages(streamId, topicId, partitionId, consumer, strategy, count, autoCommit)); } @Override public void sendMessages(StreamId streamId, TopicId topicId, Partitioning partitioning, List<Message> messages) { - // Length of streamId, topicId, partitioning and messages count (4 bytes) - var metadataLength = streamId.getSize() + topicId.getSize() + partitioning.getSize() + 4; - var payload = Unpooled.buffer(4 + metadataLength); - payload.writeIntLE(metadataLength); - payload.writeBytes(toBytes(streamId)); - payload.writeBytes(toBytes(topicId)); - payload.writeBytes(toBytes(partitioning)); - payload.writeIntLE(messages.size()); - - // Writing index - var position = 0; - for (var message : messages) { - - // The logic in messages_batch_mut.rs#message_start_position checks the - // previous index to get the starting position of the message. - // For the first message it's always 0. - // This is the reason why we are setting the position to start of the next - // message. - - // This used as both start index of next message and - // the end position for the current message. - position += message.getSize(); - - // offset - payload.writeIntLE(0); - // position - payload.writeIntLE(position); - // timestamp. - payload.writeZero(8); - } - - for (var message : messages) { - payload.writeBytes(toBytes(message)); - } - - tcpClient.send(CommandCode.Messages.SEND, payload); + FutureUtil.resolve(delegate.sendMessages(streamId, topicId, partitioning, messages)); } } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java index 7678d7c6b..d2060b2e3 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java @@ -22,31 +22,22 @@ package org.apache.iggy.client.blocking.tcp; import org.apache.iggy.client.blocking.PartitionsClient; import org.apache.iggy.identifier.StreamId; import org.apache.iggy.identifier.TopicId; -import org.apache.iggy.serde.CommandCode; -import static org.apache.iggy.serde.BytesSerializer.toBytes; +final class PartitionsTcpClient implements PartitionsClient { -class PartitionsTcpClient implements PartitionsClient { + private final org.apache.iggy.client.async.PartitionsClient delegate; - private final InternalTcpClient tcpClient; - - PartitionsTcpClient(InternalTcpClient tcpClient) { - this.tcpClient = tcpClient; + PartitionsTcpClient(org.apache.iggy.client.async.PartitionsClient delegate) { + this.delegate = delegate; } @Override public void createPartitions(StreamId streamId, TopicId topicId, Long partitionsCount) { - var payload = toBytes(streamId); - payload.writeBytes(toBytes(topicId)); - payload.writeIntLE(partitionsCount.intValue()); - tcpClient.send(CommandCode.Partition.CREATE, payload); + FutureUtil.resolve(delegate.createPartitions(streamId, topicId, partitionsCount)); } @Override public void deletePartitions(StreamId streamId, TopicId topicId, Long partitionsCount) { - var payload = toBytes(streamId); - payload.writeBytes(toBytes(topicId)); - payload.writeIntLE(partitionsCount.intValue()); - tcpClient.send(CommandCode.Partition.DELETE, payload); + FutureUtil.resolve(delegate.deletePartitions(streamId, topicId, partitionsCount)); } } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java index b3535b888..3285be9ad 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java @@ -19,56 +19,39 @@ package org.apache.iggy.client.blocking.tcp; -import io.netty.buffer.Unpooled; import org.apache.iggy.client.blocking.PersonalAccessTokensClient; import org.apache.iggy.personalaccesstoken.PersonalAccessTokenInfo; import org.apache.iggy.personalaccesstoken.RawPersonalAccessToken; -import org.apache.iggy.serde.BytesDeserializer; -import org.apache.iggy.serde.CommandCode; import org.apache.iggy.user.IdentityInfo; import java.math.BigInteger; import java.util.List; -import java.util.Optional; -import static org.apache.iggy.serde.BytesSerializer.toBytes; -import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64; +final class PersonalAccessTokensTcpClient implements PersonalAccessTokensClient { -class PersonalAccessTokensTcpClient implements PersonalAccessTokensClient { + private final org.apache.iggy.client.async.PersonalAccessTokensClient delegate; - private final InternalTcpClient tcpClient; - - public PersonalAccessTokensTcpClient(InternalTcpClient tcpClient) { - this.tcpClient = tcpClient; + PersonalAccessTokensTcpClient(org.apache.iggy.client.async.PersonalAccessTokensClient delegate) { + this.delegate = delegate; } @Override public RawPersonalAccessToken createPersonalAccessToken(String name, BigInteger expiry) { - var payload = Unpooled.buffer(); - payload.writeBytes(toBytes(name)); - payload.writeBytes(toBytesAsU64(expiry)); - return tcpClient.exchangeForEntity( - CommandCode.PersonalAccessToken.CREATE, payload, BytesDeserializer::readRawPersonalAccessToken); + return FutureUtil.resolve(delegate.createPersonalAccessToken(name, expiry)); } @Override public List<PersonalAccessTokenInfo> getPersonalAccessTokens() { - return tcpClient.exchangeForList( - CommandCode.PersonalAccessToken.GET_ALL, BytesDeserializer::readPersonalAccessTokenInfo); + return FutureUtil.resolve(delegate.getPersonalAccessTokens()); } @Override public void deletePersonalAccessToken(String name) { - var payload = toBytes(name); - tcpClient.send(CommandCode.PersonalAccessToken.DELETE, payload); + FutureUtil.resolve(delegate.deletePersonalAccessToken(name)); } @Override public IdentityInfo loginWithPersonalAccessToken(String token) { - var payload = toBytes(token); - return tcpClient.exchangeForEntity(CommandCode.PersonalAccessToken.LOGIN, payload, buf -> { - var userId = buf.readUnsignedIntLE(); - return new IdentityInfo(userId, Optional.empty()); - }); + return FutureUtil.resolve(delegate.loginWithPersonalAccessToken(token)); } } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java index 7605f164e..8f7e35ce9 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java @@ -19,62 +19,44 @@ package org.apache.iggy.client.blocking.tcp; -import io.netty.buffer.Unpooled; import org.apache.iggy.client.blocking.StreamsClient; import org.apache.iggy.identifier.StreamId; -import org.apache.iggy.serde.BytesDeserializer; -import org.apache.iggy.serde.BytesSerializer; -import org.apache.iggy.serde.CommandCode; import org.apache.iggy.stream.StreamBase; import org.apache.iggy.stream.StreamDetails; import java.util.List; import java.util.Optional; -import static org.apache.iggy.serde.BytesSerializer.toBytes; +final class StreamsTcpClient implements StreamsClient { -class StreamsTcpClient implements StreamsClient { + private final org.apache.iggy.client.async.StreamsClient delegate; - private final InternalTcpClient tcpClient; - - StreamsTcpClient(InternalTcpClient tcpClient) { - this.tcpClient = tcpClient; + StreamsTcpClient(org.apache.iggy.client.async.StreamsClient delegate) { + this.delegate = delegate; } @Override - public StreamDetails createStream(String name) { - var payloadSize = 1 + name.length(); - var payload = Unpooled.buffer(payloadSize); - - payload.writeBytes(BytesSerializer.toBytes(name)); - return tcpClient.exchangeForEntity(CommandCode.Stream.CREATE, payload, BytesDeserializer::readStreamDetails); + public Optional<StreamDetails> getStream(StreamId streamId) { + return FutureUtil.resolve(delegate.getStream(streamId)); } @Override - public Optional<StreamDetails> getStream(StreamId streamId) { - var payload = toBytes(streamId); - return tcpClient.exchangeForOptional(CommandCode.Stream.GET, payload, BytesDeserializer::readStreamDetails); + public List<StreamBase> getStreams() { + return FutureUtil.resolve(delegate.getStreams()); } @Override - public List<StreamBase> getStreams() { - return tcpClient.exchangeForList(CommandCode.Stream.GET_ALL, BytesDeserializer::readStreamBase); + public StreamDetails createStream(String name) { + return FutureUtil.resolve(delegate.createStream(name)); } @Override public void updateStream(StreamId streamId, String name) { - var payloadSize = 1 + name.length(); - var idBytes = toBytes(streamId); - var payload = Unpooled.buffer(payloadSize + idBytes.capacity()); - - payload.writeBytes(idBytes); - payload.writeBytes(BytesSerializer.toBytes(name)); - tcpClient.send(CommandCode.Stream.UPDATE, payload); + FutureUtil.resolve(delegate.updateStream(streamId, name)); } @Override public void deleteStream(StreamId streamId) { - var payload = toBytes(streamId); - tcpClient.send(CommandCode.Stream.DELETE, payload); + FutureUtil.resolve(delegate.deleteStream(streamId)); } } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java index 70277b742..1d1df7fa4 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java @@ -19,52 +19,43 @@ package org.apache.iggy.client.blocking.tcp; -import io.netty.buffer.Unpooled; import org.apache.iggy.client.blocking.SystemClient; -import org.apache.iggy.serde.BytesDeserializer; -import org.apache.iggy.serde.CommandCode; import org.apache.iggy.system.ClientInfo; import org.apache.iggy.system.ClientInfoDetails; import org.apache.iggy.system.Stats; import java.util.List; -class SystemTcpClient implements SystemClient { +final class SystemTcpClient implements SystemClient { - private final InternalTcpClient tcpClient; + private final org.apache.iggy.client.async.SystemClient delegate; - SystemTcpClient(InternalTcpClient tcpClient) { - this.tcpClient = tcpClient; + SystemTcpClient(org.apache.iggy.client.async.SystemClient delegate) { + this.delegate = delegate; } @Override public Stats getStats() { - return tcpClient.exchangeForEntity( - CommandCode.System.GET_STATS, Unpooled.EMPTY_BUFFER, BytesDeserializer::readStats); + return FutureUtil.resolve(delegate.getStats()); } @Override public ClientInfoDetails getMe() { - return tcpClient.exchangeForEntity( - CommandCode.System.GET_ME, Unpooled.EMPTY_BUFFER, BytesDeserializer::readClientInfoDetails); + return FutureUtil.resolve(delegate.getMe()); } @Override public ClientInfoDetails getClient(Long clientId) { - var payload = Unpooled.buffer(4); - payload.writeIntLE(clientId.intValue()); - return tcpClient.exchangeForEntity( - CommandCode.System.GET_CLIENT, payload, BytesDeserializer::readClientInfoDetails); + return FutureUtil.resolve(delegate.getClient(clientId)); } @Override public List<ClientInfo> getClients() { - return tcpClient.exchangeForList(CommandCode.System.GET_ALL_CLIENTS, BytesDeserializer::readClientInfo); + return FutureUtil.resolve(delegate.getClients()); } @Override public String ping() { - tcpClient.send(CommandCode.System.PING); - return ""; + return FutureUtil.resolve(delegate.ping()); } } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java index 01392dc55..1900cab15 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java @@ -19,13 +19,9 @@ package org.apache.iggy.client.blocking.tcp; -import io.netty.buffer.Unpooled; import org.apache.iggy.client.blocking.TopicsClient; import org.apache.iggy.identifier.StreamId; import org.apache.iggy.identifier.TopicId; -import org.apache.iggy.serde.BytesDeserializer; -import org.apache.iggy.serde.BytesSerializer; -import org.apache.iggy.serde.CommandCode; import org.apache.iggy.topic.CompressionAlgorithm; import org.apache.iggy.topic.Topic; import org.apache.iggy.topic.TopicDetails; @@ -34,28 +30,22 @@ import java.math.BigInteger; import java.util.List; import java.util.Optional; -import static org.apache.iggy.serde.BytesSerializer.toBytes; -import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64; +final class TopicsTcpClient implements TopicsClient { -class TopicsTcpClient implements TopicsClient { + private final org.apache.iggy.client.async.TopicsClient delegate; - private final InternalTcpClient tcpClient; - - TopicsTcpClient(InternalTcpClient tcpClient) { - this.tcpClient = tcpClient; + TopicsTcpClient(org.apache.iggy.client.async.TopicsClient delegate) { + this.delegate = delegate; } @Override public Optional<TopicDetails> getTopic(StreamId streamId, TopicId topicId) { - var payload = toBytes(streamId); - payload.writeBytes(toBytes(topicId)); - return tcpClient.exchangeForOptional(CommandCode.Topic.GET, payload, BytesDeserializer::readTopicDetails); + return FutureUtil.resolve(delegate.getTopic(streamId, topicId)); } @Override public List<Topic> getTopics(StreamId streamId) { - var payload = toBytes(streamId); - return tcpClient.exchangeForList(CommandCode.Topic.GET_ALL, payload, BytesDeserializer::readTopic); + return FutureUtil.resolve(delegate.getTopics(streamId)); } @Override @@ -67,18 +57,8 @@ class TopicsTcpClient implements TopicsClient { BigInteger maxTopicSize, Optional<Short> replicationFactor, String name) { - var streamIdBytes = toBytes(streamId); - var payload = Unpooled.buffer(23 + streamIdBytes.readableBytes() + name.length()); - - payload.writeBytes(streamIdBytes); - payload.writeIntLE(partitionsCount.intValue()); - payload.writeByte(compressionAlgorithm.asCode()); - payload.writeBytes(toBytesAsU64(messageExpiry)); - payload.writeBytes(toBytesAsU64(maxTopicSize)); - payload.writeByte(replicationFactor.orElse((short) 0)); - payload.writeBytes(BytesSerializer.toBytes(name)); - - return tcpClient.exchangeForEntity(CommandCode.Topic.CREATE, payload, BytesDeserializer::readTopicDetails); + return FutureUtil.resolve(delegate.createTopic( + streamId, partitionsCount, compressionAlgorithm, messageExpiry, maxTopicSize, replicationFactor, name)); } @Override @@ -90,22 +70,12 @@ class TopicsTcpClient implements TopicsClient { BigInteger maxTopicSize, Optional<Short> replicationFactor, String name) { - var payload = Unpooled.buffer(); - payload.writeBytes(toBytes(streamId)); - payload.writeBytes(toBytes(topicId)); - payload.writeByte(compressionAlgorithm.asCode()); - payload.writeBytes(toBytesAsU64(messageExpiry)); - payload.writeBytes(toBytesAsU64(maxTopicSize)); - payload.writeByte(replicationFactor.orElse((short) 0)); - payload.writeBytes(BytesSerializer.toBytes(name)); - - tcpClient.send(CommandCode.Topic.UPDATE, payload); + FutureUtil.resolve(delegate.updateTopic( + streamId, topicId, compressionAlgorithm, messageExpiry, maxTopicSize, replicationFactor, name)); } @Override public void deleteTopic(StreamId streamId, TopicId topicId) { - var payload = toBytes(streamId); - payload.writeBytes(toBytes(topicId)); - tcpClient.send(CommandCode.Topic.DELETE, payload); + FutureUtil.resolve(delegate.deleteTopic(streamId, topicId)); } } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java index adab5fe1f..416885f5d 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java @@ -19,13 +19,8 @@ package org.apache.iggy.client.blocking.tcp; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import org.apache.iggy.IggyVersion; import org.apache.iggy.client.blocking.UsersClient; import org.apache.iggy.identifier.UserId; -import org.apache.iggy.serde.BytesDeserializer; -import org.apache.iggy.serde.CommandCode; import org.apache.iggy.user.IdentityInfo; import org.apache.iggy.user.Permissions; import org.apache.iggy.user.UserInfo; @@ -35,116 +30,57 @@ import org.apache.iggy.user.UserStatus; import java.util.List; import java.util.Optional; -import static org.apache.iggy.serde.BytesSerializer.toBytes; +final class UsersTcpClient implements UsersClient { -class UsersTcpClient implements UsersClient { + private final org.apache.iggy.client.async.UsersClient delegate; - private final InternalTcpClient tcpClient; - - UsersTcpClient(InternalTcpClient tcpClient) { - this.tcpClient = tcpClient; + UsersTcpClient(org.apache.iggy.client.async.UsersClient delegate) { + this.delegate = delegate; } @Override public Optional<UserInfoDetails> getUser(UserId userId) { - var payload = toBytes(userId); - return tcpClient.exchangeForOptional(CommandCode.User.GET, payload, BytesDeserializer::readUserInfoDetails); + return FutureUtil.resolve(delegate.getUser(userId)); } @Override public List<UserInfo> getUsers() { - return tcpClient.exchangeForList(CommandCode.User.GET_ALL, BytesDeserializer::readUserInfo); + return FutureUtil.resolve(delegate.getUsers()); } @Override public UserInfoDetails createUser( String username, String password, UserStatus status, Optional<Permissions> permissions) { - var payload = Unpooled.buffer(); - payload.writeBytes(toBytes(username)); - payload.writeBytes(toBytes(password)); - payload.writeByte(status.asCode()); - permissions.ifPresentOrElse( - perms -> { - payload.writeByte(1); - var permissionBytes = toBytes(perms); - payload.writeIntLE(permissionBytes.readableBytes()); - payload.writeBytes(permissionBytes); - }, - () -> payload.writeByte(0)); - - return tcpClient.exchangeForEntity(CommandCode.User.CREATE, payload, BytesDeserializer::readUserInfoDetails); + return FutureUtil.resolve(delegate.createUser(username, password, status, permissions)); } @Override public void deleteUser(UserId userId) { - var payload = toBytes(userId); - tcpClient.send(CommandCode.User.DELETE, payload); + FutureUtil.resolve(delegate.deleteUser(userId)); } @Override - public void updateUser(UserId userId, Optional<String> usernameOptional, Optional<UserStatus> statusOptional) { - var payload = toBytes(userId); - usernameOptional.ifPresentOrElse( - (username) -> { - payload.writeByte(1); - payload.writeBytes(toBytes(username)); - }, - () -> payload.writeByte(0)); - statusOptional.ifPresentOrElse( - (status) -> { - payload.writeByte(1); - payload.writeByte(status.asCode()); - }, - () -> payload.writeByte(0)); - - tcpClient.send(CommandCode.User.UPDATE, payload); + public void updateUser(UserId userId, Optional<String> username, Optional<UserStatus> status) { + FutureUtil.resolve(delegate.updateUser(userId, username, status)); } @Override - public void updatePermissions(UserId userId, Optional<Permissions> permissionsOptional) { - var payload = toBytes(userId); - - permissionsOptional.ifPresentOrElse( - permissions -> { - payload.writeByte(1); - var permissionBytes = toBytes(permissions); - payload.writeIntLE(permissionBytes.readableBytes()); - payload.writeBytes(permissionBytes); - }, - () -> payload.writeByte(0)); - - tcpClient.send(CommandCode.User.UPDATE_PERMISSIONS, payload); + public void updatePermissions(UserId userId, Optional<Permissions> permissions) { + FutureUtil.resolve(delegate.updatePermissions(userId, permissions)); } @Override public void changePassword(UserId userId, String currentPassword, String newPassword) { - var payload = toBytes(userId); - payload.writeBytes(toBytes(currentPassword)); - payload.writeBytes(toBytes(newPassword)); - - tcpClient.send(CommandCode.User.CHANGE_PASSWORD, payload); + FutureUtil.resolve(delegate.changePassword(userId, currentPassword, newPassword)); } @Override public IdentityInfo login(String username, String password) { - String version = IggyVersion.getInstance().getUserAgent(); - String context = IggyVersion.getInstance().toString(); - var payloadSize = 2 + username.length() + password.length() + 4 + version.length() + 4 + context.length(); - var payload = Unpooled.buffer(payloadSize); - - payload.writeBytes(toBytes(username)); - payload.writeBytes(toBytes(password)); - payload.writeIntLE(version.length()); - payload.writeBytes(version.getBytes()); - payload.writeIntLE(context.length()); - payload.writeBytes(context.getBytes()); - - var userId = tcpClient.exchangeForEntity(CommandCode.User.LOGIN, payload, ByteBuf::readUnsignedIntLE); - return new IdentityInfo(userId, Optional.empty()); + return FutureUtil.resolve(delegate.login(username, password)); } @Override public void logout() { - tcpClient.send(CommandCode.User.LOGOUT); + FutureUtil.resolve(delegate.logout()); } } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/package-info.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/package-info.java index 8d83dc07f..192cbb8fa 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/package-info.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/package-info.java @@ -17,6 +17,15 @@ * under the License. */ +/** + * Blocking TCP client implementation for Apache Iggy. + * + * <p>This package provides blocking (synchronous) wrappers over the + * {@link org.apache.iggy.client.async.tcp async TCP client} implementation. + * Each blocking sub-client delegates to its async counterpart and unwraps + * the {@link java.util.concurrent.CompletableFuture} result via + * {@code FutureUtil.resolve()}. + */ @NonNullApi package org.apache.iggy.client.blocking.tcp;
