zhaohai666 commented on code in PR #1256:
URL: https://github.com/apache/rocketmq-clients/pull/1256#discussion_r3497371172


##########
php/Producer.php:
##########
@@ -1,73 +1,508 @@
-<?php
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache\Rocketmq;
-
-require 'vendor/autoload.php';
-
-
-use Apache\Rocketmq\V2\MessageQueue;
-use Apache\Rocketmq\V2\MessagingServiceClient;
-use Apache\Rocketmq\V2\QueryRouteRequest;
-use Apache\Rocketmq\V2\ReceiveMessageRequest;
-use Apache\Rocketmq\V2\Resource;
-use Grpc\ChannelCredentials;
-use const Grpc\STATUS_OK;
-
-class Producer
-{
-
-    public function init()
-    {
-        /**
-         * Client ID is currently concatenated using a fixed host name to
-         * facilitate code debugging.
-         */
-        $clientId = 'missyourlove' . '@' . posix_getpid() . '@' . rand(0, 10) 
. '@' . $this->getRandStr(10);
-        $client = new 
MessagingServiceClient('rmq-cn-cs02xhf2k01.cn-hangzhou.rmq.aliyuncs.com:8080', [
-            'credentials' => ChannelCredentials::createInsecure(),
-            'update_metadata' => function ($metaData) use ($clientId) {
-                $metaData['headers'] = ['clientID' => $clientId]; // Pass the 
ClientID to the server through the header
-                return $metaData;
-            }
-        ]);
-
-        $qr = new QueryRouteRequest();
-        $rs = new Resource();
-        $rs->setResourceNamespace('');
-        $rs->setName('normal_topic');
-        $qr->setTopic($rs);
-       $status = $client->QueryRoute($qr)->wait();
-       var_dump($status); // This prints out the response data returned by the 
server
-    }
-
-    public function getRandStr($length){
-        //Character combinations
-        $str = 
'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
-        $len = strlen($str)-1;
-        $randstr = '';
-        for ($i=0;$i<$length;$i++) {
-            $num=mt_rand(0,$len);
-            $randstr .= $str[$num];
-        }
-        return $randstr;
-    }
-}
-
-$xx = new Producer();
-$xx->init();
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+use Apache\Rocketmq\V2\MessagingServiceClient;
+use Apache\Rocketmq\V2\Resource;
+use Apache\Rocketmq\V2\Message;
+use Apache\Rocketmq\V2\SystemProperties;
+use Apache\Rocketmq\V2\Settings;
+use Apache\Rocketmq\V2\ClientType;
+use Apache\Rocketmq\V2\UA;
+use Apache\Rocketmq\V2\Language;
+use Apache\Rocketmq\V2\TelemetryCommand;
+use Apache\Rocketmq\V2\Publishing;
+
+/**
+ * Producer — Message producer.
+ *
+ * Core features: singleton TelemetrySession, PublishingLoadBalancer, retry,
+ * transaction support (via TransactionTrait), heartbeat (via 
HeartbeatManager),
+ * interceptor support, Swoole coroutine async support.
+ *
+ * Send and recall logic is delegated to SendMessageHandler and 
RecallMessageHandler.
+ * Use ProducerBuilder for convenient construction.
+ */
+class Producer implements TransactionCommitter, ClientTraitProvider
+{
+    use ClientTrait {
+        buildMetadata as public;
+        getOperationTimeout as public;
+        parseEndpoints as public;
+    }
+    use TransactionTrait;
+
+    private readonly MessagingServiceClient $client;
+    private readonly TelemetrySession $telemetrySession;
+    private readonly PublishingRouteManager $routeManager;
+    private readonly ProducerSettings $settings;
+    protected MessageValidator $validator;
+    private bool $isRunning = false;
+    private bool $shutdownRequested = false;
+    private array $interceptors = [];
+    private readonly Logger $logger;
+    private readonly HeartbeatManager $heartbeatManager;
+    private readonly SendMessageHandler $sendHandler;
+    private readonly RecallMessageHandler $recallHandler;
+
+    /**
+     * @param string $endpoints gRPC server endpoint
+     * @param array $options Configuration options (use ProducerBuilder 
instead)
+     *                      - clientId: string, custom client identifier 
(default: 'php-producer-{pid}-{time}')
+     *                      - maxAttempts: int, max retry attempts on send 
failure (default: 3)
+     *                      - requestTimeout: int, gRPC request timeout in ms 
(default: 3000)
+     *                      - topics: string[], topics to publish messages to 
(default: [])
+     *                      - namespace: string, resource namespace prefix 
(default: '')
+     *                      - credentials: SessionCredentials|null, AK/SK 
authentication credentials
+     *                      - validateMessageType: bool, validate message type 
against route (default: true)
+     *                      - maxBodySizeBytes: int, max message body size in 
bytes (default: 4194304)
+     *                      - tlsCredentials: TlsCredentials|null, TLS/SSL 
configuration
+     *                      - sslEnabled: bool, enable SSL for gRPC channel 
(default: true)
+     * @deprecated Use ProducerBuilder instead for better type safety and IDE 
support.
+     */
+    public function __construct(
+        private readonly string $endpoints,
+        array $options = []
+    ) {
+        $this->settings = new ProducerSettings($endpoints, $options);
+        $this->validator = new MessageValidator(
+            $options['maxBodySizeBytes'] ?? 4194304,
+            $options['validateMessageType'] ?? true
+        );
+        $this->logger = Logger::getInstance('Producer');
+
+        $this->client = RpcClientManager::getInstance()->getClient($endpoints, 
[
+            'tlsCredentials' => $this->settings->getTlsCredentials(),
+            'sslEnabled' => $this->settings->isSslEnabled(),
+        ]);
+
+        $this->telemetrySession = TelemetrySession::getInstance(
+            $this->client, $endpoints, $this->settings->getClientId(),
+            $this->settings->getCredentials(), $this->settings->getNamespace()
+        );
+        $this->routeManager = new PublishingRouteManager($this->client, 
$endpoints, $this);
+        $this->heartbeatManager = new HeartbeatManager(
+            $this->routeManager, $this->client, $this,
+            $this->settings->getTlsCredentials(), 
$this->settings->isSslEnabled()
+        );
+
+        $metadataBuilder = fn(?int $timeoutMs = null) => 
$this->buildMetadata($timeoutMs);
+        $callOptionsResolver = fn(?int $overrideTimeout = null) => 
$this->getCallOptions($overrideTimeout);
+        $operationTimeoutFn = fn(string $op) => 
$this->getOperationTimeout($op);
+        $interceptorExecutor = function (string $hookPoint, array $context = 
[]) {
+            $this->executeInterceptors($hookPoint, $context);
+        };
+
+        $this->sendHandler = new SendMessageHandler(
+            $this->client,
+            $this->settings,
+            $this->validator,
+            $this->routeManager,
+            $interceptorExecutor,
+            $metadataBuilder,
+            $callOptionsResolver,
+            $operationTimeoutFn,
+        );
+
+        $this->recallHandler = new RecallMessageHandler(
+            $this->client,
+            $this->settings,
+            $metadataBuilder,
+            $callOptionsResolver,
+        );
+    }
+
+    // ==================== Lifecycle ====================
+
+    public function start(): void
+    {
+        if ($this->isRunning) {
+            return;
+        }
+
+        try {
+            Logger::getInstance('Producer')->info("Begin to start the rocketmq 
producer, clientId={$this->settings->getClientId()}");
+            $this->establishTelemetrySession();
+            $this->registerSettingsCallback();
+            $this->registerTransactionCheckerCallback();
+
+            $this->routeManager->warmUp($this->settings->getTopics());
+
+            $this->isRunning = true;
+            $this->heartbeatManager->start();
+
+            Logger::getInstance('Producer')->info("The rocketmq producer 
starts successfully, clientId={$this->settings->getClientId()}");
+        } catch (\Exception $e) {
+            Logger::getInstance('Producer')->error("Failed to start: " . 
$e->getMessage());
+            $this->shutdown();
+            throw $e;
+        }
+    }
+
+    public function shutdown(): void
+    {
+        if (!$this->isRunning) {
+            return;
+        }
+
+        $this->shutdownRequested = true;
+        $this->logger->info("Begin to shutdown the rocketmq producer, 
clientId={$this->settings->getClientId()}");
+
+        if (SwooleCompat::isAvailable() && SwooleCompat::inCoroutine()) {
+            \Swoole\Coroutine::sleep(1);
+        }
+
+        $this->heartbeatManager->stop();
+        $this->heartbeatManager->notifyClientTermination();
+
+        if ($this->telemetrySession) {
+            $this->telemetrySession->close();
+        }
+
+        $this->isRunning = false;
+        $this->logger->info("Shutdown the rocketmq producer successfully, 
clientId={$this->settings->getClientId()}");
+    }
+
+    public function __destruct()
+    {
+        $this->shutdown();
+    }
+
+    // ==================== Send ====================
+
+    /**
+     * Send a message
+     * @param Message $message to send
+     * @return array Send result containing:
+     * - messageId: messageId
+     * - messageQueue: messageQueue
+     * - offset: offset
+     * - requestId: requestId
+     * - sendTime: sendTime
+     * - transactionId: transactionId
+     * - transactionState: transactionState
+     * @throws \Exception if producer is not running
+     */
+    public function send(Message $message): array
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+        return $this->sendHandler->send($message);
+    }
+
+    /**
+     * Send a message asynchronously
+     * @param Message $message to send
+     * @return \Generator|mixed|null | void
+     */
+    public function sendAsync(Message $message): array|\Generator
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+        return $this->sendHandler->sendAsync($message);
+    }
+
+    // ==================== Batch Send ====================
+
+    /**
+     * Send a batch of messages
+     * @param array $messages to send
+     * @return array Send result containing:
+     * @throws \Exception if producer is not running
+     */
+    public function sendBatch(array $messages): array
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+        return $this->sendHandler->sendBatch($messages);
+    }
+
+    /**
+     * Send a batch of messages asynchronously
+     * @param array $messages to send
+     * @return \Generator|mixed|null | void
+     */
+    public function sendBatchAsync(array $messages): array|\Generator
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+        return $this->sendHandler->sendBatchAsync($messages);
+    }
+
+    // ==================== Convenience Send Methods ====================
+
+    /**
+     * Send a priority message
+     * @param $topic  string Topic name
+     * @param $body  string Message body
+     * @param $priority  int Message priority
+     * @param $tag  string Message tag
+     * @return array Send result containing:
+     * @throws \Exception if producer is not running
+     */
+    public function sendPriorityMessage(string $topic, string $body, int 
$priority, string $tag = ''): array
+    {
+        if (!$this->isRunning) {
+            throw new \RuntimeException("Producer is not running now");
+        }
+        return $this->send($this->sendHandler->buildConvenienceMessage($topic, 
$body, $tag, function (SystemProperties $sp) use ($priority) {
+            $sp->setPriority($priority);

Review Comment:
   Fixed



##########
php/StatusChecker.php:
##########
@@ -0,0 +1,158 @@
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+class StatusChecker
+{
+    private static $badRequestCodes = [
+        40000, 40001, 40002, 40004, 40005, 40006,
+    ];
+
+    private static $statusMessageMap = [
+        'BAD_REQUEST' => 1,
+        'ILLEGAL_TOPIC' => 1,
+        'ILLEGAL_CONSUMER_GROUP' => 1,
+        'ILLEGAL_MESSAGE_TAG' => 1,
+        'ILLEGAL_MESSAGE_KEY' => 1,
+        'ILLEGAL_MESSAGE_GROUP' => 1,
+        'ILLEGAL_MESSAGE_PROPERTY_KEY' => 1,
+        'INVALID_TRANSACTION_ID' => 1,
+        'MESSAGE_CORRUPTED' => 1,
+        'ILLEGAL_FILTER_EXPRESSION' => 1,
+        'ILLEGAL_FILTER_SQL92_EXPRESSION' => 1,
+        'INVALID_RECEIPT_HANDLE' => 1,
+        'WRONG_ORGANIZATION' => 1,
+        'ILLEGAL_LITE_TOPIC' => 1,
+        'ILLEGAL_GLOBAL_BID' => 1,
+
+        'UNAUTHORIZED' => 2,
+
+        'PAYMENT_REQUIRED' => 3,
+
+        'FORBIDDEN' => 4,
+        'FORBIDDEN_REUSE' => 4,
+
+        'NOT_FOUND' => 5,
+        'TOPIC_NOT_FOUND' => 5,
+        'CONSUMER_GROUP_NOT_FOUND' => 5,
+
+        'PAYLOAD_TOO_LARGE' => 6,
+        'MESSAGE_BODY_TOO_LARGE' => 6,
+
+        'PAYLOAD_EMPTY' => 7,
+        'MESSAGE_BODY_EMPTY' => 7,
+
+        'TOO_MANY_REQUESTS' => 8,
+
+        'LITE_TOPIC_QUOTA_EXCEEDED' => 9,
+        'LITE_SUBSCRIPTION_QUOTA_EXCEEDED' => 10,
+
+        'REQUEST_HEADER_FIELDS_TOO_LARGE' => 11,
+        'MESSAGE_PROPERTIES_TOO_LARGE' => 11,
+
+        'INTERNAL_ERROR' => 12,
+        'INTERNAL_SERVER_ERROR' => 12,
+        'HA_NOT_AVAILABLE' => 12,
+
+        'PROXY_TIMEOUT' => 13,
+        'MASTER_PERSISTENCE_TIMEOUT' => 13,
+        'SLAVE_PERSISTENCE_TIMEOUT' => 13,
+
+        'UNSUPPORTED' => 14,
+        'VERSION_UNSUPPORTED' => 14,
+        'VERIFY_FIFO_MESSAGE_UNSUPPORTED' => 14,
+    ];
+
+    /**
+     * Check gRPC status and throw appropriate exception on failure.
+     *
+     * @param \Google\Rpc\Status|null $status gRPC status object
+     * @param string $detailMessage Optional additional detail message
+     * @return void
+     * @throws 
BadRequestException|UnauthorizedException|ForbiddenException|NotFoundException|InternalErrorException|TooManyRequestsException|\Exception
+     */
+    public static function check($status, $detailMessage = '')
+    {
+        if ($status === null) {
+            return;
+        }
+
+        $code = $status->getCode();
+        if ($code === 20000) {
+            return;
+        }
+
+        $message = $status->getMessage();
+        if (!empty($detailMessage)) {
+            $message = $message . '; detail: ' . $detailMessage;
+        }
+
+        $exceptionClass = self::resolveExceptionClass($code);
+        throw new $exceptionClass($code, $message);
+    }
+
+    /**
+     * Resolve the exception class name for a given status code.
+     *
+     * @param int $code gRPC status code
+     * @return string Fully qualified exception class name
+     */
+    private static function resolveExceptionClass($code)
+    {
+        if (in_array($code, self::$badRequestCodes)) {
+            return BadRequestException::class;
+        }
+
+        return match ($code) {
+            40100 => UnauthorizedException::class,
+            40200 => PaymentRequiredException::class,
+            40300 => ForbiddenException::class,
+            40400, 40401, 40402 => NotFoundException::class,
+            41300, 41301 => PayloadTooLargeException::class,
+            41400, 41401 => PayloadEmptyException::class,
+            42900 => TooManyRequestsException::class,
+            40901 => LiteTopicQuotaExceededException::class,

Review Comment:
   Fixed



##########
php/MessageValidator.php:
##########
@@ -0,0 +1,128 @@
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+use Apache\Rocketmq\V2\Message;
+use Apache\Rocketmq\V2\MessageType as V2MessageType;
+
+/**
+ * MessageValidator - Message validation and type detection.
+ *
+ * Extracted from Producer to separate validation concerns:
+ * 1. validateMessage() - validates topic, body, and body size
+ * 2. detectMessageType() - determines 
NORMAL/FIFO/DELAY/PRIORITY/TRANSACTION/LITE
+ * 3. Configurable max body size and message type validation (updated by 
server settings)
+ */
+class MessageValidator
+{
+    private int $maxBodySizeBytes;
+    private bool $validateMessageType;
+
+    public function __construct(int $maxBodySizeBytes = 4194304, bool 
$validateMessageType = true)
+    {
+        $this->maxBodySizeBytes = $maxBodySizeBytes;
+        $this->validateMessageType = $validateMessageType;
+    }
+
+    /**
+     * Validate a message before sending.
+     *
+     * @param Message $message The message to validate
+     * @throws \InvalidArgumentException If validation fails
+     */
+    public function validateMessage(Message $message): void
+    {
+        if (!$message->hasTopic() || 
empty(trim($message->getTopic()->getName()))) {
+            throw new \InvalidArgumentException("Message topic is required");
+        }
+        if (empty($message->getBody())) {

Review Comment:
   Fixed



##########
php/LitePushConsumer.php:
##########
@@ -0,0 +1,334 @@
+<?php
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache\Rocketmq;
+
+use Apache\Rocketmq\V2\SyncLiteSubscriptionRequest;
+use Apache\Rocketmq\V2\LiteSubscriptionAction;
+use Apache\Rocketmq\V2\Resource;
+use Apache\Rocketmq\V2\ClientType;
+
+/**
+ * LitePushConsumer - Push consumer for lite topics.
+ *
+ * Extends PushConsumer with dynamic lite topic subscription management.
+ * Instead of creating many physical topics, Lite consumers use a parent topic
+ * with logical lite topic sub-classifiers.
+ *
+ * Usage:
+ *   $consumer = new LitePushConsumer($endpoints, $consumerGroup, 
$parentTopic);
+ *   $consumer->subscribeLite('lite-topic-1', $callback);
+ *   $consumer->subscribeLite('lite-topic-2', $callback);
+ *   $consumer->start();
+ */
+class LitePushConsumer extends PushConsumer
+{
+    private readonly string $parentTopic;
+    private array $liteTopics = [];
+    private readonly int $liteSubscriptionQuota;
+    private readonly int $maxLiteTopicSize;
+    private int $syncLiteSubscriptionInterval = 30;
+    /** @var callable|null per-lite-topic callback */
+    private ?\Closure $liteMessageListener = null;
+    private int $lastSyncTime = 0;
+    private ?ProcessQueue $virtualProcessQueue = null; // ProcessQueue|null
+
+    /**
+     * Constructor.
+     *
+     * @param string $endpoints gRPC server endpoint
+     * @param string $consumerGroup Consumer group name
+     * @param string $parentTopic Parent (bound) topic
+     * @param array $options Configuration options
+     *  - clientId: string, custom client identifier (default: 
'php-push-consumer-{pid}-{time}')
+     *  - messageListener: callable|null, message consumption callback
+     *  - maxCacheMessageCount: int, max cached messages in memory (default: 
4096)
+     *  - maxCacheMessageSizeInBytes: int, max cached message total size 
(default: 67108864, 64MB)
+     *  - awaitDuration: int, long polling timeout in seconds (default: 5)
+     *  - scanIntervalSeconds: int, assignment scan interval in seconds 
(default: 5)
+     *  - receiveBatchSize: int, max messages per receive batch (default: 32)
+     *  - enableFifoConsumeAccelerator: bool, enable FIFO consume accelerator 
(default: true for Lite)
+     *  - credentials: SessionCredentials|null, AK/SK authentication 
credentials
+     *  - namespace: string, resource namespace prefix (default: '')
+     *  - tlsCredentials: TlsCredentials|null, TLS/SSL configuration
+     *  - sslEnabled: bool, enable SSL for gRPC channel (default: true)
+     *  - liteSubscriptionQuota: int, max number of lite topic subscriptions 
(default: 0 = unlimited)
+     *  - maxLiteTopicSize: int, max length of lite topic name (default: 64)
+     */
+    public function __construct(string $endpoints, string $consumerGroup, 
$parentTopic, array $options = [])
+    {
+        if (empty(trim($parentTopic))) {
+            throw new \InvalidArgumentException("LitePushConsumer parentTopic 
cannot be empty");
+        }
+        $this->parentTopic = $parentTopic;
+        $listener = $options['messageListener'] ?? null;
+        $this->liteMessageListener = $listener !== null
+            ? ($listener instanceof \Closure ? $listener : 
\Closure::fromCallable($listener))
+            : null;
+
+        $liteOptions = array_merge($options, [
+            'subscriptionExpressions' => [$parentTopic => '*'],
+            'fifo' => true,
+            'isLiteConsumer' => true,
+            'enableFifoConsumeAccelerator' => 
$options['enableFifoConsumeAccelerator'] ?? true,
+            'messageListener' => $this->liteMessageListener,
+        ]);
+
+        parent::__construct($endpoints, $consumerGroup, $liteOptions);
+
+        $this->liteSubscriptionQuota = $options['liteSubscriptionQuota'] ?? 0;
+        $this->maxLiteTopicSize = $options['maxLiteTopicSize'] ?? 64;
+    }
+
+    /**
+     * Subscribe to a lite topic.
+     *
+     * @param string $liteTopic Lite topic name
+     * @param callable|null $listener Optional per-lite-topic callback
+     * @return $this
+     */
+    public function subscribeLite(string $liteTopic, ?callable $listener = 
null): self
+    {
+        $this->checkNotRunning();
+
+        if (strlen($liteTopic) > $this->maxLiteTopicSize) {
+            throw new \RuntimeException("Lite topic name exceeds max length of 
{$this->maxLiteTopicSize}");
+        }
+
+        if ($this->liteSubscriptionQuota > 0 && count($this->liteTopics) >= 
$this->liteSubscriptionQuota) {
+            throw new \RuntimeException("Lite subscription quota exceeded: 
{$this->liteSubscriptionQuota}");
+        }
+
+        $this->liteTopics[$liteTopic] = $listener;
+
+        return $this;
+    }
+
+    /**
+     * Get the client type for this consumer.
+     *
+     * @return int ClientType::LITE_PUSH_CONSUMER
+     */
+    protected function getClientType(): int
+    {
+        return ClientType::LITE_PUSH_CONSUMER;
+    }
+
+    /**
+     * Unsubscribe from a lite topic.
+     *
+     * @param string $liteTopic Lite topic name to remove
+     * @return $this
+     */
+    public function unsubscribeLite(string $liteTopic): self
+    {
+        $this->checkNotRunning();
+        unset($this->liteTopics[$liteTopic]);
+        return $this;
+    }
+
+    /**
+     * Get subscribed lite topics.
+     *
+     * @return array
+     */
+    public function getLiteTopics(): array
+    {
+        return array_keys($this->liteTopics);
+    }
+
+    /**
+     * Set the global lite message listener (used when no per-lite-topic 
listener is set).
+     *
+     * @param callable $listener Callback invoked for messages with no 
per-lite-topic listener
+     * @return $this
+     */
+    public function setLiteMessageListener(callable $listener): self
+    {
+        $this->liteMessageListener = $listener instanceof \Closure
+            ? $listener
+            : \Closure::fromCallable($listener);
+        return $this;
+    }
+
+    /**
+     * Start the LitePushConsumer.
+     *
+     * Overrides parent start() to sync lite subscriptions, register handlers, 
and use lite-aware consume service.
+     */
+    public function start(): void
+    {
+        if ($this->isRunning()) {
+            return;
+        }
+
+        if (empty($this->liteTopics)) {
+            throw new \RuntimeException("LitePushConsumer has no lite topics 
subscribed");
+        }
+
+        if ($this->liteMessageListener === null) {
+            throw new \RuntimeException("LitePushConsumer has no lite message 
listener");
+        }
+
+        $this->logger->info("LitePushConsumer starting, 
clientId={$this->getClientId()}, parentTopic={$this->parentTopic}");
+        parent::start();
+    }
+
+    /**
+     * Setup before the main scan loop: sync lite subscriptions and wait for 
assignments.
+     */
+    protected function onStartBeforeLoop(): void
+    {
+        $self = $this;
+        $this->telemetrySession->setOnNotifyUnsubscribeLite(function 
($notifyCmd) use ($self) {
+            $liteTopic = $notifyCmd->getLiteTopic();
+            $self->logger->info("Received NotifyUnsubscribeLite for 
liteTopic={$liteTopic}");
+            $self->handleUnsubscribeLite($liteTopic);
+        });
+        $this->syncLiteSubscriptions();
+        $this->lastSyncTime = time();
+        $pollInterval = 500000;
+        $maxAttempts = 10;
+        for ($attempt = 0; $attempt < $maxAttempts; $attempt++) {
+            try {
+                $assignments = $this->queryLiteAssignment();
+                $assignmentList = $assignments ? 
ProtobufUtil::repeatedFieldToArray($assignments->getAssignments()) : [];
+                if (!empty($assignmentList)) {
+                    $this->logger->info("Lite subscription active after " . 
($attempt + 500) . "ms" . count($assignmentList) . " assignments");
+                    return;
+                }
+            } catch (\Exception $e) {
+                $this->logger->error("Error querying lite subscription: " . 
$e->getMessage());
+            }
+            $this->logger->debug("Waiting doe lite subscription to take 
effect, attempt {$attempt}/{$maxAttempts}");
+            SwooleCompat::sleep($pollInterval);
+        }
+        $this->logger->error("Lite subscription time out waiting assignments, 
will scan during normal cycle");
+    }
+
+    /**
+     * Handle server-initiated lite topic unsubscription.
+     *
+     * @param string $liteTopic Lite topic name to remove from subscriptions
+     * @return void
+     */
+    public function handleUnsubscribeLite(string $liteTopic): void
+    {
+        if (array_key_exists($liteTopic, $this->liteTopics)) {
+            unset($this->liteTopics[$liteTopic]);
+            $this->logger->info("Unsubscribed from lite topic: {$liteTopic}");
+        }
+    }
+
+    /**
+     * Hook called after each scan cycle to perform periodic lite sync.
+     */
+    protected function onScanCycleComplete(): void
+    {
+        $now = time();
+        if (!empty($this->liteTopics) && ($now - $this->lastSyncTime) >= 
$this->syncLiteSubscriptionInterval) {
+            $this->syncLiteSubscriptions();
+            $this->lastSyncTime = $now;
+        }
+    }
+
+    /**
+     * Sync lite subscriptions to server via SyncLiteSubscription gRPC.
+     */
+    public function syncLiteSubscriptions(): void
+    {
+        if (empty($this->liteTopics)) {
+            return;
+        }
+
+        $topicResource = new Resource();
+        $topicResource->setName($this->parentTopic);
+
+        $groupResource = new Resource();
+        $groupResource->setName($this->consumerGroup);
+
+        $request = new SyncLiteSubscriptionRequest();
+        $request->setAction(LiteSubscriptionAction::COMPLETE_ADD);
+        $request->setTopic($topicResource);
+        $request->setGroup($groupResource);
+        $request->setLiteTopicSet(array_keys($this->liteTopics));
+
+        $metadata = 
$this->buildMetadata(ClientConstants::GRPC_SYNC_LITE_MESSAGE_TIMEOUT / 1000);
+
+        try {
+            list($response, $status) = 
$this->getClient()->SyncLiteSubscription($request, $metadata, 
$this->getCallOptions())->wait();

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to