hubcio commented on code in PR #3235:
URL: https://github.com/apache/iggy/pull/3235#discussion_r3246493200


##########
foreign/php/src/receive_message.rs:
##########
@@ -0,0 +1,148 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use ext_php_rs::{binary::Binary, php_class, php_impl};
+use iggy::prelude::{
+    IggyMessage as RustReceiveMessage, IggyMessageHeader, PollingStrategy as 
RustPollingStrategy,
+};
+
+/// A Python class representing a received message.

Review Comment:
   python?



##########
foreign/php/src/consumer.rs:
##########
@@ -0,0 +1,280 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::{sync::Arc, time::Duration};
+
+use ext_php_rs::{
+    exception::{PhpException, PhpResult},
+    php_class, php_impl,
+    types::ZendCallable,
+};
+use iggy::prelude::{
+    AutoCommit as RustAutoCommit, AutoCommitAfter as RustAutoCommitAfter,
+    AutoCommitWhen as RustAutoCommitWhen, IggyConsumer as RustIggyConsumer, 
IggyDuration,
+};
+use tokio::sync::Mutex;
+
+use crate::iterator::ReceiveMessageIterator;
+
+/// A PHP class representing the Iggy consumer.
+#[php_class]
+pub struct IggyConsumer {
+    pub(crate) inner: Arc<Mutex<RustIggyConsumer>>,
+}
+
+#[php_impl]
+impl IggyConsumer {
+    /// Get the last consumed offset or null if no offset has been consumed 
yet.
+    pub fn get_last_consumed_offset(&self, partition_id: u32) -> Option<u64> {
+        self.inner
+            .blocking_lock()
+            .get_last_consumed_offset(partition_id)
+    }
+
+    /// Get the last stored offset or null if no offset has been stored yet.
+    pub fn get_last_stored_offset(&self, partition_id: u32) -> Option<u64> {
+        self.inner
+            .blocking_lock()
+            .get_last_stored_offset(partition_id)
+    }
+
+    /// Gets the name of the consumer group.
+    pub fn name(&self) -> String {
+        self.inner.blocking_lock().name().to_string()
+    }
+
+    /// Gets the current partition id or 0 if no messages have been polled yet.
+    pub fn partition_id(&self) -> u32 {
+        self.inner.blocking_lock().partition_id()

Review Comment:
   why are you sometimes using blocking_lock vs block_on?



##########
foreign/php/src/identifier.rs:
##########
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::str::FromStr;
+
+use ext_php_rs::{convert::FromZval, flags::DataType, types::Zval};
+use iggy::prelude::{IdKind, Identifier};
+
+pub enum PhpIdentifier {
+    String(String),
+    Int(u32),
+}
+
+impl FromZval<'_> for PhpIdentifier {
+    const TYPE: DataType = DataType::Mixed;
+
+    fn from_zval(zval: &Zval) -> Option<Self> {
+        if let Some(value) = zval.string() {
+            return Some(Self::String(value));
+        }
+
+        zval.long()
+            .and_then(|value| u32::try_from(value).ok())
+            .map(Self::Int)
+    }
+}
+
+impl From<PhpIdentifier> for Identifier {
+    fn from(identifier: PhpIdentifier) -> Self {
+        match identifier {
+            PhpIdentifier::String(value) => 
Identifier::from_str(&value).unwrap(),
+            PhpIdentifier::Int(value) => Identifier::numeric(value).unwrap(),

Review Comment:
   dont unwrap in prod code



##########
foreign/php/src/client.rs:
##########
@@ -0,0 +1,350 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::{str::FromStr, sync::Arc, time::Duration};
+
+use ext_php_rs::{
+    exception::{PhpException, PhpResult},
+    php_class, php_impl,
+};
+use iggy::prelude::{
+    CompressionAlgorithm, Consumer as RustConsumer, Identifier, IggyClient as 
RustIggyClient,
+    IggyClientBuilder, IggyDuration, IggyExpiry, IggyMessage as RustMessage, 
MaxTopicSize,
+    Partitioning, PollingStrategy as RustPollingStrategy, *,
+};
+use tokio::sync::Mutex;
+
+use crate::consumer::{AutoCommit, IggyConsumer};
+use crate::identifier::PhpIdentifier;
+use crate::receive_message::{PollingStrategy, ReceiveMessage};
+use crate::send_message::SendMessage;
+use crate::stream::StreamDetails;
+use crate::topic::TopicDetails;
+
+/// A PHP class representing the Iggy client.
+#[php_class]
+pub struct IggyClient {
+    inner: Arc<RustIggyClient>,
+}
+
+#[php_impl]
+impl IggyClient {
+    /// Constructs a new IggyClient from a TCP server address.
+    #[php(constructor)]
+    pub fn __construct(conn: Option<String>) -> PhpResult<Self> {
+        let client = IggyClientBuilder::new()
+            .with_tcp()
+            .with_server_address(conn.unwrap_or_else(|| 
"127.0.0.1:8090".to_string()))
+            .build()
+            .map_err(to_php_exception)?;
+
+        Ok(Self {
+            inner: Arc::new(client),
+        })
+    }
+
+    /// Constructs a new IggyClient from a connection string.
+    pub fn from_connection_string(connection_string: String) -> 
PhpResult<Self> {
+        let client =
+            
RustIggyClient::from_connection_string(&connection_string).map_err(to_php_exception)?;
+
+        Ok(Self {
+            inner: Arc::new(client),
+        })
+    }
+
+    /// Sends a ping request to the server.
+    pub fn ping(&self) -> PhpResult {
+        let inner = self.inner.clone();
+        runtime().block_on(async move { 
inner.ping().await.map_err(to_php_exception) })
+    }
+
+    /// Logs in the user with the given credentials.
+    pub fn login_user(&self, username: String, password: String) -> PhpResult {
+        let inner = self.inner.clone();
+
+        runtime().block_on(async move {
+            inner
+                .login_user(&username, &password)
+                .await
+                .map(|_| ())
+                .map_err(to_php_exception)
+        })
+    }
+
+    /// Connects the IggyClient to its service.
+    pub fn connect(&self) -> PhpResult {
+        let inner = self.inner.clone();
+        runtime().block_on(async move { 
inner.connect().await.map_err(to_php_exception) })
+    }
+
+    /// Creates a new stream.
+    pub fn create_stream(&self, name: String) -> PhpResult {
+        let inner = self.inner.clone();
+
+        runtime().block_on(async move {
+            inner
+                .create_stream(&name)
+                .await
+                .map(|_| ())
+                .map_err(to_php_exception)
+        })
+    }
+
+    /// Gets a stream by id or name.
+    pub fn get_stream(&self, stream_id: PhpIdentifier) -> 
PhpResult<Option<StreamDetails>> {
+        let stream_id = Identifier::from(stream_id);
+        let inner = self.inner.clone();
+
+        runtime().block_on(async move {
+            inner
+                .get_stream(&stream_id)
+                .await
+                .map(|stream| stream.map(StreamDetails::from))
+                .map_err(to_php_exception)
+        })
+    }
+
+    /// Creates a topic.
+    ///
+    /// message_expiry_micros is null for server default.
+    #[allow(clippy::too_many_arguments)]
+    pub fn create_topic(
+        &self,
+        stream: PhpIdentifier,
+        name: String,
+        partitions_count: u32,
+        compression_algorithm: Option<String>,
+        replication_factor: Option<u8>,
+        message_expiry_micros: Option<u64>,
+        max_topic_size: Option<u64>,
+    ) -> PhpResult {
+        let compression_algorithm = match compression_algorithm {
+            Some(value) => 
CompressionAlgorithm::from_str(&value).map_err(to_php_exception)?,
+            None => CompressionAlgorithm::default(),
+        };
+        let expiry = message_expiry_micros.map_or(IggyExpiry::ServerDefault, 
|micros| {
+            IggyExpiry::ExpireDuration(iggy_duration_from_micros(micros))
+        });
+        let max_size = max_topic_size.map_or(MaxTopicSize::ServerDefault, 
MaxTopicSize::from);
+        let stream = Identifier::from(stream);
+        let inner = self.inner.clone();
+
+        runtime().block_on(async move {
+            inner
+                .create_topic(
+                    &stream,
+                    &name,
+                    partitions_count,
+                    compression_algorithm,
+                    replication_factor,
+                    expiry,
+                    max_size,
+                )
+                .await
+                .map(|_| ())
+                .map_err(to_php_exception)
+        })
+    }
+
+    /// Gets a topic by stream and topic id/name.
+    pub fn get_topic(
+        &self,
+        stream_id: PhpIdentifier,
+        topic_id: PhpIdentifier,
+    ) -> PhpResult<Option<TopicDetails>> {
+        let stream_id = Identifier::from(stream_id);
+        let topic_id = Identifier::from(topic_id);
+        let inner = self.inner.clone();
+
+        runtime().block_on(async move {
+            inner
+                .get_topic(&stream_id, &topic_id)
+                .await
+                .map(|topic| topic.map(TopicDetails::from))
+                .map_err(to_php_exception)
+        })
+    }
+
+    /// Sends messages to a topic.
+    pub fn send_messages(
+        &self,
+        stream: PhpIdentifier,
+        topic: PhpIdentifier,
+        partition_id: u32,
+        messages: Vec<&SendMessage>,
+    ) -> PhpResult {
+        let stream = Identifier::from(stream);
+        let topic = Identifier::from(topic);
+        let partitioning = Partitioning::partition_id(partition_id);
+        let mut messages: Vec<RustMessage> = messages
+            .into_iter()
+            .map(|message| (*message).clone().inner)
+            .collect();
+        let inner = self.inner.clone();
+
+        runtime().block_on(async move {
+            inner
+                .send_messages(&stream, &topic, &partitioning, 
messages.as_mut())
+                .await
+                .map_err(to_php_exception)
+        })
+    }
+
+    /// Polls messages from the specified topic and partition.
+    pub fn poll_messages(
+        &self,
+        stream: PhpIdentifier,
+        topic: PhpIdentifier,
+        partition_id: u32,
+        polling_strategy: &PollingStrategy,
+        count: u32,
+        auto_commit: bool,
+    ) -> PhpResult<Vec<ReceiveMessage>> {
+        let consumer = RustConsumer::default();
+        let stream = Identifier::from(stream);
+        let topic = Identifier::from(topic);
+        let strategy: RustPollingStrategy = polling_strategy.into();
+        let inner = self.inner.clone();
+
+        runtime().block_on(async move {
+            let polled_messages = inner
+                .poll_messages(
+                    &stream,
+                    &topic,
+                    Some(partition_id),
+                    &consumer,
+                    &strategy,
+                    count,
+                    auto_commit,
+                )
+                .await
+                .map_err(to_php_exception)?;
+
+            Ok(polled_messages
+                .messages
+                .into_iter()
+                .map(|message| ReceiveMessage {
+                    inner: message,
+                    partition_id,
+                })
+                .collect())
+        })
+    }
+
+    /// Creates and initializes a consumer group consumer.
+    #[allow(clippy::too_many_arguments)]
+    pub fn consumer_group(
+        &self,
+        name: String,
+        stream: String,
+        topic: String,
+        partition_id: Option<u32>,
+        polling_strategy: Option<&PollingStrategy>,
+        batch_length: Option<u32>,
+        auto_commit: Option<&AutoCommit>,
+        create_consumer_group_if_not_exists: bool,
+        auto_join_consumer_group: bool,
+        poll_interval_micros: Option<u64>,
+        polling_retry_interval_micros: Option<u64>,
+        init_retries: Option<u32>,
+        init_retry_interval_micros: Option<u64>,
+        allow_replay: bool,
+    ) -> PhpResult<IggyConsumer> {
+        let mut builder = self
+            .inner
+            .consumer_group(&name, &stream, &topic)
+            .map_err(to_php_exception)?
+            .without_encryptor()
+            .partition(partition_id);
+
+        builder = if create_consumer_group_if_not_exists {
+            builder.create_consumer_group_if_not_exists()
+        } else {
+            builder.do_not_create_consumer_group_if_not_exists()
+        };
+        builder = if auto_join_consumer_group {
+            builder.auto_join_consumer_group()
+        } else {
+            builder.do_not_auto_join_consumer_group()
+        };
+        if let Some(polling_strategy) = polling_strategy {
+            builder = builder.polling_strategy(polling_strategy.into());
+        }
+        if let Some(batch_length) = batch_length {
+            builder = builder.batch_length(batch_length);
+        }
+        if let Some(auto_commit) = auto_commit {
+            builder = builder.auto_commit(auto_commit.into());
+        }
+        builder = match poll_interval_micros {
+            Some(micros) => 
builder.poll_interval(iggy_duration_from_micros(micros)),
+            None => builder.without_poll_interval(),
+        };
+        if let Some(micros) = polling_retry_interval_micros {
+            builder = 
builder.polling_retry_interval(iggy_duration_from_micros(micros));
+        }
+
+        match (init_retries, init_retry_interval_micros) {
+            (Some(retries), Some(micros)) => {
+                builder = builder.init_retries(retries, 
iggy_duration_from_micros(micros));
+            }
+            (Some(_), None) => {
+                return Err(
+                    "'init_retry_interval_micros' is required if 
'init_retries' is set".into(),
+                );
+            }
+            (None, Some(_)) => {
+                return Err(
+                    "'init_retries' is required if 
'init_retry_interval_micros' is set".into(),
+                );
+            }
+            (None, None) => {}
+        }
+        if allow_replay {
+            builder = builder.allow_replay();
+        }
+
+        let mut consumer = builder.build();
+        runtime().block_on(async move {
+            consumer.init().await.map_err(to_php_exception)?;
+            Ok(IggyConsumer {
+                inner: Arc::new(Mutex::new(consumer)),
+            })
+        })
+    }
+}
+
+fn to_php_exception(error: impl std::fmt::Display) -> PhpException {
+    PhpException::default(error.to_string())
+}
+
+fn iggy_duration_from_micros(micros: u64) -> IggyDuration {
+    IggyDuration::new(Duration::from_micros(micros))
+}
+
+fn runtime() -> &'static tokio::runtime::Runtime {
+    static RUNTIME: std::sync::OnceLock<tokio::runtime::Runtime> = 
std::sync::OnceLock::new();
+
+    RUNTIME.get_or_init(|| {
+        tokio::runtime::Builder::new_multi_thread()
+            .enable_all()
+            .build()
+            .expect("failed to initialize Tokio runtime")
+    })
+}

Review Comment:
   why are you creating 3 runtimes in client.rs / consumer.rs / iterator.rs?



##########
foreign/php/src/identifier.rs:
##########
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::str::FromStr;
+
+use ext_php_rs::{convert::FromZval, flags::DataType, types::Zval};
+use iggy::prelude::{IdKind, Identifier};
+
+pub enum PhpIdentifier {
+    String(String),
+    Int(u32),
+}
+
+impl FromZval<'_> for PhpIdentifier {
+    const TYPE: DataType = DataType::Mixed;
+
+    fn from_zval(zval: &Zval) -> Option<Self> {
+        if let Some(value) = zval.string() {
+            return Some(Self::String(value));
+        }
+
+        zval.long()
+            .and_then(|value| u32::try_from(value).ok())
+            .map(Self::Int)
+    }
+}
+
+impl From<PhpIdentifier> for Identifier {
+    fn from(identifier: PhpIdentifier) -> Self {
+        match identifier {
+            PhpIdentifier::String(value) => 
Identifier::from_str(&value).unwrap(),
+            PhpIdentifier::Int(value) => Identifier::numeric(value).unwrap(),
+        }
+    }
+}
+
+impl From<&Identifier> for PhpIdentifier {
+    fn from(value: &Identifier) -> PhpIdentifier {
+        match value.kind {
+            IdKind::String => 
PhpIdentifier::String(value.get_string_value().unwrap()),
+            IdKind::Numeric => 
PhpIdentifier::Int(value.get_u32_value().unwrap()),

Review Comment:
   dont unwrap in prod code



##########
foreign/php/Dockerfile.test:
##########
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+FROM rust:1.95-slim-bookworm
+
+RUN apt-get update && apt-get install -y --no-install-recommends \
+    ca-certificates \
+    curl \
+    git \
+    clang \
+    libclang-dev \
+    libssl-dev \
+    php-cli \
+    php-dev \
+    pkg-config \
+    unzip \
+    && rm -rf /var/lib/apt/lists/*
+
+ENV PHP=/usr/bin/php
+ENV PHP_CONFIG=/usr/bin/php-config
+ENV IGGY_HOST=iggy-server
+ENV IGGY_PORT=8090
+
+WORKDIR /workspace
+
+COPY Cargo.toml Cargo.lock ./
+COPY core/ ./core/
+
+COPY foreign/php/Cargo.toml ./foreign/php/
+COPY foreign/php/composer.json ./foreign/php/
+COPY foreign/php/README.md foreign/php/LICENSE foreign/php/NOTICE 
./foreign/php/
+COPY foreign/php/.cargo/ ./foreign/php/.cargo/
+COPY foreign/php/src/ ./foreign/php/src/
+COPY foreign/php/tests/ ./foreign/php/tests/
+COPY foreign/php/scripts/ ./foreign/php/scripts/
+
+WORKDIR /workspace/foreign/php
+
+RUN cargo install cargo-php --locked
+RUN cargo php install --release --yes

Review Comment:
   remove --release



##########
foreign/php/tests/IggySdkTest.php:
##########
@@ -0,0 +1,364 @@
+<?php
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+declare(strict_types=1);
+
+final class IggySdkTest
+{
+    public function testPing(): void
+    {
+        new_client()->ping();
+    }
+
+    public function testClientNotNull(): void
+    {
+        assert_true(new_client() instanceof IggyClient);
+    }
+
+    public function testClientFromConnectionString(): void
+    {
+        $client = new_connection_string_client();
+        $client->ping();
+    }
+
+    public function testCreateAndGetStream(): void
+    {
+        $client = new_client();
+        $streamName = unique_name('test-stream');
+
+        $client->createStream($streamName);
+        $stream = $client->getStream($streamName);
+
+        assert_not_null($stream);
+        assert_same($streamName, $stream->name);
+        assert_true($stream->id >= 0, 'expected non-negative stream id');
+    }
+
+    public function testNewStreamHasNoTopics(): void
+    {
+        $client = new_client();
+        $streamName = unique_name('test-stream');
+
+        $client->createStream($streamName);
+        $stream = $client->getStream($streamName);
+
+        assert_not_null($stream);
+        assert_same($streamName, $stream->name);
+        assert_true($stream->id > 0, 'expected positive stream id');
+        assert_same(0, $stream->topics_count);
+    }
+
+    public function testCreateAndGetTopic(): void
+    {
+        $client = new_client();
+        $streamName = unique_name('test-stream');
+        $topicName = unique_name('test-topic');
+
+        $client->createStream($streamName);
+        $client->createTopic($streamName, $topicName, 2, null, null, null, 
null);
+        $topic = $client->getTopic($streamName, $topicName);
+
+        assert_not_null($topic);
+        assert_same($topicName, $topic->name);
+        assert_true($topic->id >= 0, 'expected non-negative topic id');
+        assert_same(2, $topic->partitions_count);
+    }
+
+    public function testListTopicsViaGetTopic(): void
+    {
+        $client = new_client();
+        $streamName = unique_name('test-stream');
+        $topicName = unique_name('test-topic');
+
+        create_stream_and_topic($client, $streamName, $topicName);
+        $topic = $client->getTopic($streamName, $topicName);
+
+        assert_not_null($topic);
+        assert_same($topicName, $topic->name);
+        assert_true($topic->id >= 0, 'expected non-negative topic id');
+        assert_same(1, $topic->partitions_count);
+    }
+
+    public function testSendAndPollMessages(): void
+    {
+        $client = new_client();
+        $streamName = unique_name('msg-stream');
+        $topicName = unique_name('msg-topic');
+        $partitionId = 0;
+        $messages = array_map(
+            static fn (int $i): string => "Test message {$i} - {$streamName}",
+            range(1, 3),
+        );
+
+        create_stream_and_topic($client, $streamName, $topicName);
+        $client->sendMessages(
+            $streamName,
+            $topicName,
+            $partitionId,
+            array_map(static fn (string $payload): SendMessage => new 
SendMessage($payload), $messages),
+        );
+
+        $polled = $client->pollMessages($streamName, $topicName, $partitionId, 
PollingStrategy::first(), 10, true);
+        assert_true(count($polled) >= count($messages), 'expected at least the 
sent messages');
+        assert_same($messages, array_slice(collect_payloads($polled), 0, 
count($messages)));
+    }

Review Comment:
   same test as 124-146



##########
foreign/php/tests/IggySdkTest.php:
##########
@@ -0,0 +1,364 @@
+<?php
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+declare(strict_types=1);
+
+final class IggySdkTest
+{
+    public function testPing(): void
+    {
+        new_client()->ping();
+    }
+
+    public function testClientNotNull(): void
+    {
+        assert_true(new_client() instanceof IggyClient);
+    }
+
+    public function testClientFromConnectionString(): void
+    {
+        $client = new_connection_string_client();
+        $client->ping();
+    }
+
+    public function testCreateAndGetStream(): void
+    {
+        $client = new_client();
+        $streamName = unique_name('test-stream');
+
+        $client->createStream($streamName);
+        $stream = $client->getStream($streamName);
+
+        assert_not_null($stream);
+        assert_same($streamName, $stream->name);
+        assert_true($stream->id >= 0, 'expected non-negative stream id');
+    }
+
+    public function testNewStreamHasNoTopics(): void
+    {
+        $client = new_client();
+        $streamName = unique_name('test-stream');
+
+        $client->createStream($streamName);
+        $stream = $client->getStream($streamName);
+
+        assert_not_null($stream);
+        assert_same($streamName, $stream->name);
+        assert_true($stream->id > 0, 'expected positive stream id');
+        assert_same(0, $stream->topics_count);
+    }
+
+    public function testCreateAndGetTopic(): void
+    {
+        $client = new_client();
+        $streamName = unique_name('test-stream');
+        $topicName = unique_name('test-topic');
+
+        $client->createStream($streamName);
+        $client->createTopic($streamName, $topicName, 2, null, null, null, 
null);
+        $topic = $client->getTopic($streamName, $topicName);
+
+        assert_not_null($topic);
+        assert_same($topicName, $topic->name);
+        assert_true($topic->id >= 0, 'expected non-negative topic id');
+        assert_same(2, $topic->partitions_count);
+    }
+
+    public function testListTopicsViaGetTopic(): void
+    {
+        $client = new_client();
+        $streamName = unique_name('test-stream');
+        $topicName = unique_name('test-topic');
+
+        create_stream_and_topic($client, $streamName, $topicName);
+        $topic = $client->getTopic($streamName, $topicName);
+
+        assert_not_null($topic);
+        assert_same($topicName, $topic->name);
+        assert_true($topic->id >= 0, 'expected non-negative topic id');
+        assert_same(1, $topic->partitions_count);
+    }
+
+    public function testSendAndPollMessages(): void
+    {
+        $client = new_client();
+        $streamName = unique_name('msg-stream');
+        $topicName = unique_name('msg-topic');
+        $partitionId = 0;
+        $messages = array_map(
+            static fn (int $i): string => "Test message {$i} - {$streamName}",
+            range(1, 3),
+        );
+
+        create_stream_and_topic($client, $streamName, $topicName);
+        $client->sendMessages(
+            $streamName,
+            $topicName,
+            $partitionId,
+            array_map(static fn (string $payload): SendMessage => new 
SendMessage($payload), $messages),
+        );
+
+        $polled = $client->pollMessages($streamName, $topicName, $partitionId, 
PollingStrategy::first(), 10, true);
+        assert_true(count($polled) >= count($messages), 'expected at least the 
sent messages');
+        assert_same($messages, array_slice(collect_payloads($polled), 0, 
count($messages)));
+    }
+
+    public function testSendAndPollMessagesAsBytes(): void

Review Comment:
   where exactly does it pass "bytes"?



##########
foreign/php/src/send_message.rs:
##########
@@ -0,0 +1,84 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bytes::Bytes;
+use ext_php_rs::{
+    binary::Binary,
+    exception::{PhpException, PhpResult},
+    php_class, php_impl,
+};
+use iggy::prelude::{IggyMessage as RustIggyMessage, IggyMessageHeader};
+
+/// A PHP class representing a message to be sent.
+#[php_class]
+pub struct SendMessage {
+    pub(crate) inner: RustIggyMessage,
+}
+
+impl Clone for SendMessage {
+    fn clone(&self) -> Self {
+        Self {
+            inner: RustIggyMessage {
+                header: IggyMessageHeader {
+                    checksum: self.inner.header.checksum,
+                    id: self.inner.header.id,
+                    offset: self.inner.header.offset,
+                    timestamp: self.inner.header.timestamp,
+                    origin_timestamp: self.inner.header.origin_timestamp,
+                    user_headers_length: self.inner.header.user_headers_length,
+                    payload_length: self.inner.header.payload_length,
+                    reserved: self.inner.header.reserved,
+                },
+                payload: self.inner.payload.clone(),
+                user_headers: self.inner.user_headers.clone(),
+            },
+        }
+    }
+}
+
+#[php_impl]
+impl SendMessage {
+    /// Constructs a new `SendMessage` instance from a PHP string.
+    ///
+    /// PHP strings are byte strings, so this accepts both text and binary 
payloads.
+    #[php(constructor)]
+    pub fn __construct(data: Binary<u8>) -> PhpResult<Self> {
+        Self::from_payload(Vec::from(data))

Review Comment:
   why so many allocations



##########
foreign/php/tests/run.php:
##########


Review Comment:
   why not use phpunit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to