Repository: incubator-rocketmq-externals Updated Branches: refs/heads/master ed9ad5181 -> 950af6e96
[ROCKETMQ-171] Initialized the PHP_SDK basic structure closes apache/incubator-rocketmq-externals#9 Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/950af6e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/950af6e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/950af6e9 Branch: refs/heads/master Commit: 950af6e963f166a686b34b26f2f5d8108fc9c55b Parents: ed9ad51 Author: netroby <[email protected]> Authored: Tue Apr 11 16:40:04 2017 +0800 Committer: dongeforever <[email protected]> Committed: Tue Apr 11 16:40:04 2017 +0800 ---------------------------------------------------------------------- rocketmq-php/.gitignore | 4 +- rocketmq-php/composer.json | 16 ++ rocketmq-php/example/simple/AsyncProducer.php | 53 +++++ rocketmq-php/example/simple/Producer.php | 36 ++++ rocketmq-php/readme.md | 3 + .../src/Client/Common/ClientErrorCode.php | 27 +++ .../src/Client/Exception/MQBrokerException.php | 23 ++ .../src/Client/Exception/MQClientException.php | 28 +++ .../src/Client/Latency/MQFaultStrategy.php | 107 ++++++++++ .../src/Client/Producer/DefaultMQProducer.php | 135 ++++++++++++ rocketmq-php/src/Client/Producer/SendStatus.php | 26 +++ rocketmq-php/src/Common/CommunicationMode.php | 25 +++ rocketmq-php/src/Common/Help/FAQUrl.php | 81 ++++++++ rocketmq-php/src/Common/Message/Message.php | 208 +++++++++++++++++++ .../src/Common/Message/MessageConst.php | 71 +++++++ rocketmq-php/src/Common/System.php | 30 +++ .../src/Remoting/Common/RemotingHelper.php | 23 ++ 17 files changed, 895 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/.gitignore ---------------------------------------------------------------------- diff --git a/rocketmq-php/.gitignore b/rocketmq-php/.gitignore index d4baaf0..c61f94d 100644 --- a/rocketmq-php/.gitignore +++ b/rocketmq-php/.gitignore @@ -6,4 +6,6 @@ coverage.out *.log tags temp_parser_file -y.output \ No newline at end of file +y.output +/vendor/ +.vscode/ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/composer.json ---------------------------------------------------------------------- diff --git a/rocketmq-php/composer.json b/rocketmq-php/composer.json new file mode 100644 index 0000000..65d6566 --- /dev/null +++ b/rocketmq-php/composer.json @@ -0,0 +1,16 @@ +{ + "name": "rocketmq/rocketmq-php-sdk", + "description": "RocketMQ PHP SDK written with pure php code.", + "type": "library", + "license": "Apache-2.0", + "authors": [{ + "name": "huzhifeng", + "email": "[email protected]" + }], + "require": {}, + "autoload": { + "psr-4": { + "RocketMQ\\": "src/" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/example/simple/AsyncProducer.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/example/simple/AsyncProducer.php b/rocketmq-php/example/simple/AsyncProducer.php new file mode 100644 index 0000000..1727835 --- /dev/null +++ b/rocketmq-php/example/simple/AsyncProducer.php @@ -0,0 +1,53 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use RocketMQ\Client\Producer\DefaultMQProducer; +use RocketMQ\Common\Message\Message; +use RocketMQ\Remoting\Common\RemotingHelper; + +$producer = new DefaultMQProducer("Jodie_Daily_test"); +$producer->start(); +$producer->setRetryTimesWhenSendAsyncFailed(0); + +for ($i = 0; $i < 10000000; $i++) { + + try { + $index = $i; + $msg = new Message("Jodie_topic_1023", + "TagA", + "OrderID188", + "Hello world" . getBytes(RemotingHelper::DEFAULT_CHARSET)); + $producer->send($msg, new class() extends SendCallback() { + public + function onSuccess($sendResult) + { + printf("%-10d OK %s %n", $index, $sendResult->getMsgId()); + } + + public + function onException($e) + { + printf("%-10d Exception %s %n", $index, $e); + $e->printStackTrace(); + } + }); +} catch (\Exception $e) { + echo $e->getTraceAsString(); + } + +} +$producer->shutdown(); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/example/simple/Producer.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/example/simple/Producer.php b/rocketmq-php/example/simple/Producer.php new file mode 100644 index 0000000..931c50d --- /dev/null +++ b/rocketmq-php/example/simple/Producer.php @@ -0,0 +1,36 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use RocketMQ\Client\Producer\DefaultMQProducer; +use RocketMQ\Common\Message\Message; + +$producer = new DefaultMQProducer("ProducerGroupName"); +$producer->start(); + +for ($i = 0; $i < 10000000; $i++) { + try { + $msg = new Message("TopicTest", + "TagA", + "OrderID188", + "Hello world"); + $sendResult = $producer->send($msg); + echo $sendResult; + } catch (\Exception $e) { + echo $e->getMessage() . PHP_EOL . $e->getTraceAsString(); + } +} +$producer->shutdown(); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/readme.md ---------------------------------------------------------------------- diff --git a/rocketmq-php/readme.md b/rocketmq-php/readme.md new file mode 100644 index 0000000..a209fda --- /dev/null +++ b/rocketmq-php/readme.md @@ -0,0 +1,3 @@ +# RocketMQ PHP SDK + +This is PHP SDK for RocketMQ. Written with pure PHP language. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/src/Client/Common/ClientErrorCode.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/src/Client/Common/ClientErrorCode.php b/rocketmq-php/src/Client/Common/ClientErrorCode.php new file mode 100644 index 0000000..f06cf52 --- /dev/null +++ b/rocketmq-php/src/Client/Common/ClientErrorCode.php @@ -0,0 +1,27 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace RocketMQ\Client\Common; + +class ClientErrorCode +{ + const CONNECT_BROKER_EXCEPTION = 10001; + const ACCESSS_BROKER_TIMEOUT = 10002; + const BROKER_NOT_EXIST_EXCEPTION = 10003; + const NO_NAME_SERVER_EXCEPTION = 10004; + const NOT_FOUND_TOPIC_EXCEPTION = 10005; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/src/Client/Exception/MQBrokerException.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/src/Client/Exception/MQBrokerException.php b/rocketmq-php/src/Client/Exception/MQBrokerException.php new file mode 100644 index 0000000..7d6fa95 --- /dev/null +++ b/rocketmq-php/src/Client/Exception/MQBrokerException.php @@ -0,0 +1,23 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace RocketMQ\Client\Exception; + +class MQBrokerException extends \Exception +{ + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/src/Client/Exception/MQClientException.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/src/Client/Exception/MQClientException.php b/rocketmq-php/src/Client/Exception/MQClientException.php new file mode 100644 index 0000000..0857e4c --- /dev/null +++ b/rocketmq-php/src/Client/Exception/MQClientException.php @@ -0,0 +1,28 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace RocketMQ\Client\Exception; + +class MQClientException extends \Exception +{ + + public function setResponseCode($code) + { + $this->responseCode = $code; + return $this; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/src/Client/Latency/MQFaultStrategy.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/src/Client/Latency/MQFaultStrategy.php b/rocketmq-php/src/Client/Latency/MQFaultStrategy.php new file mode 100644 index 0000000..3f05b7f --- /dev/null +++ b/rocketmq-php/src/Client/Latency/MQFaultStrategy.php @@ -0,0 +1,107 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace RocketMQ\Client\Latency; + +class MQFaultStrategy +{ + public $log; + public $latencyFaultTolerance; + public $sendLatencyFaultEnable = false; + public $latencyMax = [50, 100, 550, 1000, 2000, 3000, 15000]; + public $notAvailableDuration = [0, 0, 30000, 60000, 120000, 180000, 600000]; + public function __construct() + { + $this->log = ClientLogger.getLog(); + $this->latencyFaultTolerance = new LatencyFaultToleranceImpl(); + } + public function getNotAvailableDuration() + { + return $this->notAvailableDuration; + } + public function setNotAvailableDuration($notAvailableDuration) + { + $this->notAvailableDuration = $notAvailableDuration; + } + + public function getLatencyMax() + { + return $this->latencyMax; + } + + public function setLatencyMax($latencyMax) + { + $this->latencyMax = $latencyMax; + } + + public function isSendLatencyFaultEnable() + { + return $this->sendLatencyFaultEnable; + } + public function setSendLatencyFaultEnable($sendLatencyFaultEnable) + { + $this->sendLatencyFaultEnable = $sendLatencyFaultEnable; + } + public function selectOneMessageQueue($tpInfo, $lastBrokerName) + { + if ($this->sendLatencyFaultEnable) { + try { + $index = $tpInfo->getSendWhichQueue()->getAndIncrement(); + for ($i = 0; $i < strlen($tpInfo->getMessageQueueList()); $i++) { + $pos = abs($index++) % strlen(tpInfo.getMessageQueueList()); + if ($pos < 0) + $pos = 0; + $mq = $tpInfo->getMessageQueueList()->get($pos); + if ($this->latencyFaultTolerance->isAvailable($mq->getBrokerName())) { + if (null == $lastBrokerName || $mq->getBrokerName() == $lastBrokerName) + return $mq; + } + } + + $notBestBroker = $this->latencyFaultTolerance->pickOneAtLeast(); + $writeQueueNums = $tpInfo->getQueueIdByBroker($notBestBroker); + if ($writeQueueNums > 0) { + $mq = $tpInfo->selectOneMessageQueue(); + if ($notBestBroker != null) { + $mq->setBrokerName($notBestBroker); + $mq->setQueueId($tpInfo->getSendWhichQueue()->getAndIncrement() % $writeQueueNums); + } + return $mq; + } + else { + $this->latencyFaultTolerance->remove($notBestBroker); + } + } + catch (\Exception $e) { + $this->log->error("Error occurred when selecting message queue", e); + } + + return $this->tpInfo->selectOneMessageQueue(); + } + + return $this->tpInfo->selectOneMessageQueue($lastBrokerName); + } + public function updateFaultItem($brokerName, $currentLatency, $isolation) + { + + } + + public function computeNotAvailableDuration($currentLatency) + { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/src/Client/Producer/DefaultMQProducer.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/src/Client/Producer/DefaultMQProducer.php b/rocketmq-php/src/Client/Producer/DefaultMQProducer.php new file mode 100644 index 0000000..5534486 --- /dev/null +++ b/rocketmq-php/src/Client/Producer/DefaultMQProducer.php @@ -0,0 +1,135 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace RocketMQ\Client\Producer; + +use RocketMQ\Client\Common\ClientErrorCode; +use RocketMQ\Client\Exception\MQClientException; +use RocketMQ\Client\Latency\MQFaultStrategy; +use RocketMQ\Common\CommunicationMode; +use RocketMQ\Common\Message\Message; +use RocketMQ\Common\System; + +class DefaultMQProducer +{ + + public $mqFaultStrategy; + public function __construct() + { + $this->mqfaultStrategy = new MQFaultStrategy(); + + } + public function start() + { + + } + + public function shutdown() + { + + } + + + public function updateFaultItem($brokerName, $currentLatency, $isolation) + { + + } + /** + * @param Message $msg + */ + public function send($msg, $communicationMode = CommunicationMode::SYNC, $sendCallback = null, $timeout = null) + { + $this->makeSureStateOK(); + Validators::checkMessage($msg, $this->defaultMQProducer); + + $invokeID = random::nextLong(); + $beginTimestampFirst = System::currentTimeMillis(); //System.currentTimeMillis() + $beginTimestampPrev = $beginTimestampFirst; + $endTimestamp = $beginTimestampFirst; + $topicPublishInfo = $this->tryToFindTopicPublishInfo($msg->getTopic()); + if ($topicPublishInfo != null && $topicPublishInfo->ok()) { + $mq = null; + $exception = null; + $sendResult = null; + $timesTotal = $communicationMode === CommunicationMode::SYNC ? 1 + $this->defaultMQProducer->getRetryTimesWhenSendFailed() : 1; + $times = 0; + $brokersSent = ''; + for (; $times < $timesTotal; $times++) { + $lastBrokerName = null == $mq ? null : $mq->getBrokerName(); + $tmpmq = $this->selectOneMessageQueue($topicPublishInfo, $lastBrokerName); + if ($tmpmq != null) { + $mq = $tmpmq; + $brokersSent[$times] = $mq->getBrokerName(); + try { + $beginTimestampPrev = System::currentTimeMillis(); + $sendResult = $this->sendKernelImpl($msg, $mq, $communicationMode, $sendCallback, + $topicPublishInfo, $timeout); + $endTimestamp = System::currentTimeMillis(); + $this->updateFaultItem($mq->getBrokerName(), $endTimestamp - $beginTimestampPrev, false); + switch ($communicationMode) { + case CommunicationMode::ASYNC: + return null; + case CommunicationMode::ONEWAY: + return null; + case CommunicationMode::SYNC: + if ($sendResult->getSendStatus() != SendStatus::SEND_OK) { + $this->defaultMQProducer->isRetryAnotherBrokerWhenNotStoreOK(); + } + + return $sendResult; + default: + break; + } + } catch (\Exception $e) { + $endTimestamp = System::currentTimeMillis(); + $this->updateFaultItem($mq->getBrokerName(), $endTimestamp - $beginTimestampPrev, true); + $this->log->warn(sprintf("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", + invokeID, endTimestamp - beginTimestampPrev, mq), $e); + $this->log->warn($msg->toString()); + + + if ($sendResult != null) { + return $sendResult; + } + + $info = sprintf("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", + $times, + System::currentTimeMillis() - $beginTimestampFirst, + $msg->getTopic(), + var_export($brokersSent, 1)); + + $info += FAQUrl::suggestTodo(FAQUrl::SEND_MSG_FAILED); + + $mqClientException = new MQClientException($info, $exception); + + throw $mqClientException; + } + + $nsList = $this->getmQClientFactory()->getMQClientAPIImpl()->getNameServerAddressList(); + if (null == $nsList || $nsList->isEmpty()) { + throw (new MQClientException( + "No name server address, please set it->" + FAQUrl::suggestTodo(FAQUrl::NAME_SERVER_ADDR_NOT_EXIST_URL), + null))->setResponseCode(ClientErrorCode::NO_NAME_SERVER_EXCEPTION); + } + + throw (new MQClientException("No route info of this topic, " + $msg->getTopic() + FAQUrl::suggestTodo(FAQUrl::NO_TOPIC_ROUTE_INFO), + null))->setResponseCode(ClientErrorCode::NOT_FOUND_TOPIC_EXCEPTION); + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/src/Client/Producer/SendStatus.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/src/Client/Producer/SendStatus.php b/rocketmq-php/src/Client/Producer/SendStatus.php new file mode 100644 index 0000000..0527d42 --- /dev/null +++ b/rocketmq-php/src/Client/Producer/SendStatus.php @@ -0,0 +1,26 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace RocketMQ\Client\Producer; + +class SendStatus +{ + const SEND_OK = 0; + const FLUSH_DISK_TIMEOUT = 1; + const FLUSH_SLAVE_TIMEOUT = 2; + const SLAVE_NOT_AVAILABLE = 3; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/src/Common/CommunicationMode.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/src/Common/CommunicationMode.php b/rocketmq-php/src/Common/CommunicationMode.php new file mode 100644 index 0000000..ab7cc33 --- /dev/null +++ b/rocketmq-php/src/Common/CommunicationMode.php @@ -0,0 +1,25 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace RocketMQ\Common; + +class CommunicationMode +{ + const SYNC = 0; + const ASYNC = 1; + const ONEWAY = 2; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/src/Common/Help/FAQUrl.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/src/Common/Help/FAQUrl.php b/rocketmq-php/src/Common/Help/FAQUrl.php new file mode 100644 index 0000000..fbfbc65 --- /dev/null +++ b/rocketmq-php/src/Common/Help/FAQUrl.php @@ -0,0 +1,81 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace RocketMQ\Common\Help; + +class FAQUrl +{ + const APPLY_TOPIC_URL = // + "http://rocketmq.apache.org/docs/faq/"; + + const NAME_SERVER_ADDR_NOT_EXIST_URL = // + "http://rocketmq.apache.org/docs/faq/"; + + const GROUP_NAME_DUPLICATE_URL = // + "http://rocketmq.apache.org/docs/faq/"; + + const CLIENT_PARAMETER_CHECK_URL = // + "http://rocketmq.apache.org/docs/faq/"; + + const SUBSCRIPTION_GROUP_NOT_EXIST = // + "http://rocketmq.apache.org/docs/faq/"; + + const CLIENT_SERVICE_NOT_OK = // + "http://rocketmq.apache.org/docs/faq/"; + + // FAQ: No route info of this topic, TopicABC + const NO_TOPIC_ROUTE_INFO = // + "http://rocketmq.apache.org/docs/faq/"; + + const LOAD_JSON_EXCEPTION = // + "http://rocketmq.apache.org/docs/faq/"; + + const SAME_GROUP_DIFFERENT_TOPIC = // + "http://rocketmq.apache.org/docs/faq/"; + + const MQLIST_NOT_EXIST = // + "http://rocketmq.apache.org/docs/faq/"; + + const UNEXPECTED_EXCEPTION_URL = // + "http://rocketmq.apache.org/docs/faq/"; + + const SEND_MSG_FAILED = // + "http://rocketmq.apache.org/docs/faq/"; + + const UNKNOWN_HOST_EXCEPTION = // + "http://rocketmq.apache.org/docs/faq/"; + + const TIP_STRING_BEGIN = "\nSee "; + const TIP_STRING_END = " for further details."; + + public static function suggestTodo($url) + { + return static::TIP_STRING_BEGIN . $url . static::TIP_STRING_END; + } + + public static function attachDefaultURL($errorMessage) + { + if ($errorMessage !== null) { + $index = strpos($errorMessage, static::TIP_STRING_BEGIN); + if (false === $index) { + return $errorMessage . "\n" . "For more information, please visit the url, " . static::UNEXPECTED_EXCEPTION_URL; + } + } + + return $errorMessage; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/src/Common/Message/Message.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/src/Common/Message/Message.php b/rocketmq-php/src/Common/Message/Message.php new file mode 100644 index 0000000..eb6c3f6 --- /dev/null +++ b/rocketmq-php/src/Common/Message/Message.php @@ -0,0 +1,208 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace RocketMQ\Common\Message; + +class Message +{ + + private $serialVersionUID = 8445773977080406428; + private $topic; + private $flag; + private $properties; + private $body; + + public function __construct(...$args) + { + $c = count($args); + switch ($c) { + case 0: + break; + case 2: + $this->initMessage($args[0], "", "", 0, $args[1], true); + break; + case 3: + $this->initMessage($args[0], $args[1], "", 0, $args[2], true); + break; + case 4: + $this->initMessage($args[0], $args[1], $args[2], 0, $args[3], true); + break; + + } + + } + + public function initMessage($topic, $tags, $keys = '', $flag = 0, $body, $waitStoreMsgOK) + { + $this->topic = $topic; + $this->flag = $flag; + $this->body = $body; + if (null !== $tag && strlen($tags) > 0) { + $this->setTags(tags); + } + if (null !== $keys && strlen($keys) > 0) { + $this->setKeys($keys); + } + $this->setWaitStoreMsgOk($waitStoreMsgOK); + } + + public function putProperty($name, $value) + { + if (null === $this->properties) { + $this->properties = []; + } + $this->properties[$name] = $value; + } + + public function clearProperty($name) + { + if (null === $this->properties) { + unset($this->properties[$name]); + } + } + + public function putUserProperty($name, $value) + { + //TODO + } + + public function getUserProperty($name) + { + return $this->getProperty($name); + } + + public function getProperty($name) + { + if (null === $this->properties) { + $this->properties = []; + } + return (array_key_exists($name, $this->properties)) ? $this->properties[$name] : null; + } + + public function gtTopic() + { + return $this->topic; + } + + public function setTopic($topic) + { + $this->topic = $topic; + } + + public function getTags() + { + return $this->getProperty(MessageConst::PROPERTY_TAGS); + } + + public function setTags($tags) + { + $this->putProperty(MessageConst::PROPERTY_TAGS, $tags); + } + + public function setKeys($keys) + { + $keys = implode(MessageConst::KEY_SEPARATOR, $keys); + $this->putProperty(MessageConst::PROPERTY_KEYS, $keys); + } + + public function getKeys() + { + return $this->getProperty(MessageConst::PROPERTY_KEYS); + } + + public function setWaitStroreMsgOk($waitStoreMsgOK) + { + + } + + public function getDelayTimeLevel() + { + $t = $this->getProperty(MessageConst . PROPERTY_DELAY_TIME_LEVEL); + if ($t !== null) { + return (int)$t; + } + + return 0; + } + + public function setDelayTimeLevel($level) + { + $this->putProperty(MessageConst::PROPERTY_DELAY_TIME_LEVEL, $level); + } + + public function isWaitStoreMsgOK() + { + $result = $this->getProperty(MessageConst::PROPERTY_WAIT_STORE_MSG_OK); + if (null === $result) { + return true; + } + + return (bool)$result; + } + + public function setWaitStoreMsgOK($waitStoreMsgOK) + { + $this->putProperty(MessageConst::PROPERTY_WAIT_STORE_MSG_OK, $waitStoreMsgOK); + } + + public function getFlag() + { + return $this->flag; + } + + public function setFlag($flag) + { + $this->flag = $flag; + } + + public function getBody() + { + return $this->body; + } + + public function setBody($body) + { + $this->body = $body; + } + + public function getProperties() + { + return $this->properties; + } + + public function setProperties($properties) + { + $this->properties = $properties; + } + + public function getBuyerId() + { + return $this->getProperty(MessageConst::PROPERTY_BUYER_ID); + } + + public function setBuyerId($buyerId) + { + $this->putProperty(MessageConst::PROPERTY_BUYER_ID, $buyerId); + } + + + public function toString() + { + return "Message [topic=" . $this->topic . ", flag=" . $this->flag . ", properties=" . $this->properties . ", body=" + . ($this->body != null ? strlen($this->body) : 0) . "]"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/src/Common/Message/MessageConst.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/src/Common/Message/MessageConst.php b/rocketmq-php/src/Common/Message/MessageConst.php new file mode 100644 index 0000000..fbd32b2 --- /dev/null +++ b/rocketmq-php/src/Common/Message/MessageConst.php @@ -0,0 +1,71 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace RocketMQ\Common\Message; + +class MessageConst +{ + const PROPERTY_KEYS = "KEYS"; + const PROPERTY_TAGS = "TAGS"; + const PROPERTY_WAIT_STORE_MSG_OK = "WAIT"; + const PROPERTY_DELAY_TIME_LEVEL = "DELAY"; + const PROPERTY_RETRY_TOPIC = "RETRY_TOPIC"; + const PROPERTY_REAL_TOPIC = "REAL_TOPIC"; + const PROPERTY_REAL_QUEUE_ID = "REAL_QID"; + const PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"; + const PROPERTY_PRODUCER_GROUP = "PGROUP"; + const PROPERTY_MIN_OFFSET = "MIN_OFFSET"; + const PROPERTY_MAX_OFFSET = "MAX_OFFSET"; + const PROPERTY_BUYER_ID = "BUYER_ID"; + const PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"; + const PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG"; + const PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG"; + const PROPERTY_MQ2_FLAG = "MQ2_FLAG"; + const PROPERTY_RECONSUME_TIME = "RECONSUME_TIME"; + const PROPERTY_MSG_REGION = "MSG_REGION"; + const PROPERTY_TRACE_SWITCH = "TRACE_ON"; + const PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY"; + const PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES"; + const PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME"; + + const KEY_SEPARATOR = " "; + + const STRING_HASH_SET = [ + PROPERTY_TRACE_SWITCH, + PROPERTY_MSG_REGION, + PROPERTY_KEYS, + PROPERTY_TAGS, + PROPERTY_WAIT_STORE_MSG_OK, + PROPERTY_DELAY_TIME_LEVEL, + PROPERTY_RETRY_TOPIC, + PROPERTY_REAL_TOPIC, + PROPERTY_REAL_QUEUE_ID, + PROPERTY_TRANSACTION_PREPARED, + PROPERTY_PRODUCER_GROUP, + PROPERTY_MIN_OFFSET, + PROPERTY_MAX_OFFSET, + PROPERTY_BUYER_ID, + PROPERTY_ORIGIN_MESSAGE_ID, + PROPERTY_TRANSFER_FLAG, + PROPERTY_CORRECTION_FLAG, + PROPERTY_MQ2_FLAG, + PROPERTY_RECONSUME_TIME, + PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, + PROPERTY_MAX_RECONSUME_TIMES, + PROPERTY_CONSUME_START_TIMESTAMP + ]; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/src/Common/System.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/src/Common/System.php b/rocketmq-php/src/Common/System.php new file mode 100644 index 0000000..e18b282 --- /dev/null +++ b/rocketmq-php/src/Common/System.php @@ -0,0 +1,30 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace RocketMQ\Common; + +class System +{ + /** + * Get java like time millis + * @return mixed + */ + public static function currentTimeMillis() + { + return microtime(true) * 10000; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/950af6e9/rocketmq-php/src/Remoting/Common/RemotingHelper.php ---------------------------------------------------------------------- diff --git a/rocketmq-php/src/Remoting/Common/RemotingHelper.php b/rocketmq-php/src/Remoting/Common/RemotingHelper.php new file mode 100644 index 0000000..3c151ae --- /dev/null +++ b/rocketmq-php/src/Remoting/Common/RemotingHelper.php @@ -0,0 +1,23 @@ +<?php +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace RocketMQ\Remoting\Common; + +class RemotingHelper +{ + +} \ No newline at end of file
