This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch stable in repository https://gitbox.apache.org/repos/asf/pulsar-java-contrib.git
commit f0a249f888cfb4d0be666531b897a5007d77ef68 Author: xiangying <[email protected]> AuthorDate: Mon Jul 29 10:49:55 2024 +0800 Project init: Set up contribution guidelines and added example --- .github/pull_request_template.md | 121 ++++------- README.md | 24 +-- best-pratice-blogs/consume-best-practice.md | 229 +++++++++++++++++++++ .../img/blog-consume-best-practice/Ack-hole.png | Bin 0 -> 189219 bytes .../img/blog-consume-best-practice/AckTimeout.png | Bin 0 -> 127045 bytes .../static/img/blog-consume-best-practice/DLQ.png | Bin 0 -> 144988 bytes .../acknowledgement-types.png | Bin 0 -> 111861 bytes .../cumulative-ack-problem.png | Bin 0 -> 82471 bytes .../subscription-types.png | Bin 0 -> 158919 bytes build/quickstarts-showcase/pom.xml | 2 +- conrtibutionGuides.md | 100 +++++++++ contributedFeatures.md | 23 +++ customizationFeatures.md | 11 + openProvider.md | 34 +++ pom.xml | 33 +-- pulsar-auth-contrib/pom.xml | 20 ++ pulsar-bookkeeper-contrib/pom.xml | 20 ++ pulsar-client-contrib/README.md | 37 ---- pulsar-connector-contrib/pom.xml | 20 ++ pulsar-function-contrib/pom.xml | 20 ++ pulsar-interceptor-contrib/pom.xml | 20 ++ pulsar-loadbalance-contrib/pom.xml | 20 ++ pulsar-metrics-contrib/pom.xml | 20 ++ pulsar-transaction-contrib/pom.xml | 20 ++ runQuickstartsFromSource.bat | 8 - runQuickstartsFromSource.sh | 9 - 26 files changed, 618 insertions(+), 173 deletions(-) diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index b4097a1..bf17dfc 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -1,107 +1,68 @@ <!-- -Thank you for submitting this pull request. +### Contribution Checklist + + - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. -*Do NOT use the default branch `stable` to create a pull request, -use the branch `development` instead. The latter uses SNAPSHOT versions.* + - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. + + - Each pull request should address only one issue, not mix up code from multiple issues. + + - Each commit in the pull request has a meaningful commit message -Please provide all relevant information as outlined below. Feel free to delete -a section if that type of information is not available. - -Any changes to school-timetabling must be synced across its quarkus, kotlin-quarkus, and spring-boot variants, -and also the external https://github.com/quarkusio/quarkus-quickstarts/tree/main/optaplanner-quickstart. + - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. --> -### JIRA - -<!-- Add a JIRA ticket link if it exists. --> -<!-- Example: https://issues.redhat.com/browse/PLANNER-1234 --> +<!-- Either this PR fixes an issue, --> -### Referenced pull requests - -<!-- Add URLs of all referenced pull requests if they exist. This is only required when making -changes that span multiple kiegroup repositories and depend on each other. --> -<!-- Example: -- https://github.com/kiegroup/droolsjbpm-build-bootstrap/pull/1234 -- https://github.com/kiegroup/drools/pull/3000 -- https://github.com/kiegroup/optaplanner/pull/899 -- etc. ---> +Fixes #xyz -<details> -<summary> -How to retest this PR or trigger a specific build: -</summary> +<!-- or this PR is one task of an issue --> -- for <b>pull request checks</b> - Please add comment: <b>Jenkins retest this</b> +Main Issue: #xyz -- for a <b>specific pull request check</b> - please add comment: <b>Jenkins (re)run [optaplanner-quickstarts] tests</b> +<!-- If the PR belongs to a PIP, please add the PIP link here --> -- for a <b>full downstream build</b> - please add the label `run_fdb` +### Motivation -- for <b>quarkus branch checks</b> - Run checks against Quarkus current used branch - Please add comment: <b>Jenkins run quarkus-branch</b> +<!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. --> -- for a <b>quarkus branch specific check</b> - Run checks against Quarkus current used branch - Please add comment: <b>Jenkins (re)run [optaplanner-quickstarts] quarkus-branch</b> +### Modifications -- for <b>quarkus main checks</b> - Run checks against Quarkus main branch - Please add comment: <b>Jenkins run quarkus-main</b> +<!-- Describe the modifications you've done. --> -- for a <b>specific quarkus main check</b> - Run checks against Quarkus main branch - Please add comment: <b>Jenkins (re)run [optaplanner-quickstarts] quarkus-branch</b> +### Verifying this change -- for <b>quarkus lts checks</b> - Run checks against Quarkus lts branch - Please add comment: <b>Jenkins run quarkus-lts</b> +- [ ] Make sure that the change passes the CI checks. -- for a <b>specific quarkus lts check</b> - Run checks against Quarkus lts branch - Please add comment: <b>Jenkins (re)run [optaplanner-quickstarts] quarkus-lts</b> +*(Please pick either of the following options)* -- for <b>native checks</b> - Run native checks - Please add comment: <b>Jenkins run native</b> +This change is a trivial rework / code cleanup without any test coverage. -- for a <b>specific native check</b> - Run native checks - Please add comment: <b>Jenkins (re)run [optaplanner-quickstarts] native</b> +*(or)* -- for <b>mandrel checks</b> - Run native checks against Mandrel image - Please add comment: <b>Jenkins run mandrel</b> +This change is already covered by existing tests, such as *(please describe tests)*. -- for a <b>specific mandrel check</b> - Run native checks against Mandrel image - Please add comment: <b>Jenkins (re)run [optaplanner-quickstarts] mandrel</b> +*(or)* -- for <b>mandrel lts checks</b> - Run native checks against Mandrel image and quarkus lts branch - Please add comment: <b>Jenkins run mandrel-lts</b> +This change added tests and can be verified as follows: -- for a <b>specific mandrel lts check</b> - Run native checks against Mandrel image and quarkus lts branch - Please add comment: <b>Jenkins (re)run [optaplanner-quickstarts] mandrel-lts</b> -</details> +*(example:)* +- *Added integration tests for end-to-end deployment with large payloads (10MB)* +- *Extended integration test for recovery after broker failure* -<details> -<summary> -How to backport a pull request to a different branch? -</summary> +### Matching PR in forked repository -In order to automatically create a **backporting pull request** please add one or more labels having the following format `backport-<branch-name>`, where `<branch-name>` is the name of the branch where the pull request must be backported to (e.g., `backport-7.67.x` to backport the original PR to the `7.67.x` branch). +PR in forked repository: <!-- ENTER URL HERE --> -> **NOTE**: **backporting** is an action aiming to move a change (usually a commit) from a branch (usually the main one) to another one, which is generally referring to a still maintained release branch. Keeping it simple: it is about to move a specific change or a set of them from one branch to another. - -Once the original pull request is successfully merged, the automated action will create one backporting pull request per each label (with the previous format) that has been added. +<!-- +After opening this PR, the build in apache/pulsar will fail and instructions will +be provided for opening a PR in the PR author's forked repository. -If something goes wrong, the author will be notified and at this point a manual backporting is needed. +apache/pulsar pull requests should be first tested in your own fork since the +apache/pulsar CI based on GitHub Actions has constrained resources and quota. +GitHub Actions provides separate quota for pull requests that are executed in +a forked repository. -> **NOTE**: this automated backporting is triggered whenever a pull request on `main` branch is labeled or closed, but both conditions must be satisfied to get the new PR created. -</details> +The tests will be run in the forked repository until all PR review comments have +been handled, the tests pass and the PR is approved by a reviewer. +--> diff --git a/README.md b/README.md index b7ed14e..dd290c5 100644 --- a/README.md +++ b/README.md @@ -1,29 +1,21 @@ # Apache Pulsar Java Contrib -[](https://github.com/StevenLuMT/pulsar-java-contrib/actions/workflows/build.yml) +Pulsar java contrib is to provide a non-core code maintenance repository to collect plugin implementations, personalized features, experimental features, and best practices from users. -pulsar-java-contrib is similar to the positioning of [opentelemetry-java-contrib](https://github.com/open-telemetry/opentelemetry-java-contrib): -* One is to prevent the Pulsar main repository from adding too many unnecessary functions, which would make Pulsar too bloated, increasing the user's usage cost and our maintenance cost. -* The other is to reduce the user's usage cost by implementing a ready-to-use implementation class based on Pulsar's external interface. +- [Plugin Contribution Guide](conrtibutionGuides.md) lists the core interfaces in Pulsar that can be implemented by contributors, and provides implementation guidelines for each type of interface. -these ways achieve these two goals are the plugin library and the yellow pages. If you need an easier way to implement some plugin library based on Pulsar that cannot be easily satisfied by importing Pulsar directly, then this project is hopefully for you. +- [Plugin Implementation List](contributedFeatures.md) lists the implemented plugins. Users can select the ones they need for reuse. -## Provided Libraries +- [Personalization Features](customizationFeatures.md) lists the customized features and experimental features that require modification to the Pulsar source code. +- [Best Practices]([best-pratice-blogs](best-pratice-blogs)) lists the best practices for each function summarized by community contributions. + - [consume-best-practice.md](best-pratice-blogs%2Fconsume-best-practice.md) -* [Pulsar Client Contrib](./pulsar-client-contrib/README.md) - -## Getting Started - -```bash - -``` ## Contributing -pulsar-java-contrib is actively in development. If you have an idea for a similar use case in the metrics, traces, or logging -domain we would be very interested in supporting it. Please -[open an issue](https://github.com/StevenLuMT/pulsar-java-contrib/issues/new/choose) to share your idea or +pulsar-java-contrib is actively in development. If you have some common use cases for plugins, please contact us and we'll be happy to support. +Please[open an issue](https://github.com/StevenLuMT/pulsar-java-contrib/issues/new/choose) to share your idea or suggestion. PRs are always welcome and greatly appreciated, but for larger functional changes a pre-coding introduction can be helpful to ensure this is the correct place and that active or conflicting efforts don't exist. diff --git a/best-pratice-blogs/consume-best-practice.md b/best-pratice-blogs/consume-best-practice.md new file mode 100644 index 0000000..87deff9 --- /dev/null +++ b/best-pratice-blogs/consume-best-practice.md @@ -0,0 +1,229 @@ +# Consume Best Practice + +## Background Knowledge + +### Subscription Types + +Pulsar is a distributed message system where messages can be sent to topics by producers and consumed by consumers. +Consumers can subscribe to the topics in four ways (subscription types): + +* **Exclusive** +* **Failover** +* **Shared** +* **Key-shared** + +The messages are consumed in order for a single partition in Exclusive and Failover modes and out of order for Shared and +Key-shared mode. The main difference between Exclusive and Failover is that in Exclusive mode, the consumer is exclusive +to the entire topic, while in Failover mode, the consumer is exclusive to only one partition. Failover mode allows backup +consumer connections that are not consumed. The main difference between Shared and Key-shared is whether their dispatch +strategy is implemented via a key. For more information about subscription type, refer to the [Pulsar website](https://pulsar.apache.org/docs/3.2.x/concepts-messaging/). + + +### Acknowledgment + +The messages should be acknowledged after they are fully consumed and processed, and then the messages would not be received +for the same subscription again. Pulsar provides two ways to acknowledge messages: + +* **Cumulative acknowledgment** +* **Individual acknowledgment** + +Cumulative acknowledgment receives a message or a message id as a parameter and marks the messages before the message as +consumed for this subscription. For multiple-partition topics, the cumulative acknowledgment will work for the single +partition without impacting other partitions. Individual acknowledgment receives a message or a message id as a parameter +and only marks this message as consumed for this subscription. + + +### Messages Redeliver Mechanism + +There might be instances where the received messages cannot be processed at this time or some errors happened during processing. +The client needs to redeliver the unacknowledged messages or a particular message immediately or after a delay. +Pulsar provides at-least-once semantics when the client does not enable transaction because the client may cache some +messages out of Pulsar when redelivering messages. + +Pulsar Consumer API provides four ways to reconsume the unacknowledged messages later: + +* **ackTimeout** +* **deadLetterPolicy** + * **reconsumeLaterCumulative** + * **reconsumeLater** +* **negativeAcknowledge** +* **redeliverUnacknowledgedMessages** + +The **ackTimeout** is an automatic protection mechanism. If a consumer configured ackTimeout, the messages will be +auto-redelivered when the received messages are not acknowledged after a long time. It works at the client side, ensuring +the unacknowledged messages will be redelivered to another consumer when the connection of a consumer remains active +but the business system gets stuck or misses to ack some messages. If the consumer gets disconnected when the client crashes, +the messages will be auto-redelivered by the broker too. However, this mechanism does not wait for the response of acknowledgment, +so if an acknowledgment fails on the broker side or proxy side, an ack hole may occur. + +The **deadLetterPolicy** is a policy in the message queue used to handle messages that cannot be processed properly. +In many message queue systems (such as RabbitMQ, Google Pub/Sub, Apache Pulsar, etc.), this strategy is implemented. +In Pulsar, the deadLetterPolicy is implemented at the client side, it creates a new retry letter topic and dead letter +topic when building a consumer with `deadLetterPolicy` configuration. When a consumer calls `reconsumeLaterCumulative` +or `reconsumeLater`, the message (method parameter) will be produced to the retry letter topic until the retry time reaches +the `maxRedeliverCount`. The message will be produced to the dead letter topic when the retry time reaches the ` +maxRedeliverCount`. The main difference between them is that `reconsumeLaterCumulative` will cumulative ack the +message (method parameter) after it is produced and `reconsumeLater` will individual ack the message (method parameter) +after it is produced. + +The `negativeAcknowledge` is used to redeliver certain unacknowledged messages while `redeliverUnacknowledgedMessages` +is used to redeliver all the unacknowledged messages received by this consumer. The main difference between them and +deadLetterPolicy is that there is no new topic created, and there is an unlimited number of redeliveries. + +## Best Practice Suggestion + +Different scenarios require different best practices. Users who value the order of partition messages and wish to batch +process data should choose or implement an appropriate routerPolicy to send a batch of ordered messages to the same +partition. They should also select either Exclusive or Failover subscription modes. For users who do not care about +message order and those in stream processing scenarios, they can opt to use Shared and Key-shared subscription modes. + +### Shared && Key-shared +#### At-least-once +The term `At least once` ensures that the message is processed at least once, and there is a risk of duplicate messages, +but the performance is better than the `Exactly once` semantics. For the `At least once` semantics in shared or +key-shared mode, the most important matter is avoiding ack-hole as much as possible. The ack-hole refers to instances +where some single messages are missed for acknowledgment. This can occur in the following cases (All of these are the cases +where the connection is not interrupted, and reconnection will automatically resend the message.): + +1. Consumer receives messages but fails to process due to business system error. +2. Consumer receives messages but misses to process or the business system gets stuck. +3. Consumer acknowledges the message, but the acknowledge request is lost when sending to the broker. The possibility of +4. data loss exists in the case of TCP long connections, although the probability is extremely low. +  + +For case 1, configuring `deadLetterPolicy` is a good solution. When the business system receives a signal of message +processing failure, it can immediately check and decide whether to retry after a period of time. After they call +`reconsumeLater` API, the message will be acknowledged and resent to the retry letter topic that is automatically +subscribed by the consumer. When it reaches `maxRedeliverCount`, the message will be sent to the dead letter topic. +Messages sent to the dead letter topic should be considered as non-retryable messages, and we recommend setting an +initialSubscriptionName to avoid being deleted by the retention policy and then, let maintenance personnel regularly +handle non-retryable messages in the dead letter topic. + +```java + Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(maxRedeliverCount) + .deadLetterTopic(deadLetterTopic) + .retryLetterTopic(retryLetterTopic) + .initialSubscriptionName(initialSubscriptionName) + .build()) + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .topic(topic) + .subscriptionName(sub) + .subscribe(); + Message<Integer> message = consumer.receive(); + try{ + // Process message + consumer.acknowledge(message); + }catch(Exception e){ + // Check whether to redeliver the message again. + if(e instanceof RetryException){ + consumer.reconsumeLater(message, delay, timeunit); + } + consumer.reconsumeLater(message, delay, timeunit); + } +``` + +For case 2, where the business system gets stuck or an error occurs which causes the received message to be missed for +processing, configuring ack timeout is a good solution. The consumer will record every message received on the client side. +If these messages have not been acknowledged after the specified time, the consumer will request the broker to resend these +messages to other brokers. +**Suggestion:** The ack timeout should be set slightly longer based on the message processing speed of the business system. +If the business system is still processing messages, but the processing time is too long or the timeout is set too small, +it may result in duplicate message consumption. + +```java + Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32) + .ackTimeout(tiemout, timeunit) + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .topic(topic) + .subscriptionName(sub) + .subscribe(); +``` + +For case 3, there are no effective preventive measures. This is because all methods of redelivery are triggered by the +client when the connection is not disconnected, and the client does not wait for an ack response by default. + +`Short-term solution`: This should be an extreme case where users can unload the topic to resolve after observing +an abnormal ack hole (existing for more than 3 * ack time out). +`Long-term solution`: Modify the ack-timeout mechanism to wait for the acknowledgment response. + +#### Exactly-once +If the users cannot tolerate message repetition, they can acknowledge messages with a transaction. Transaction can +prevent repeated consumption. If a message has been acknowledged, it will wait for a response and throw +`TransactionConflictException` when the client acknowledges the message with a transaction. + +**Notices:** When using transactions, do not configure DeadLetterPolicy, but instead use negativeAcknowledge to resend messages. + +```java + Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32) + .ackTimeout(tiemout, timeunit) + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .topic(topic) + .subscriptionName(sub) + .subscribe(); + Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(timeout, timeunit).build().get(); + + Message<Integer> message = consumer.receive(); + try { + // process message + consumer.acknowledgeAsync(message.getMessageId(), transaction); + transaction.commit().get(); + } catch (Exception e) { + if (!(e.getCause() instanceof PulsarClientException.TransactionConflictException)) { + consumer.negativeAcknowledge(message); + } + transaction.abort().get(); + } finally { + message.release(); + } +``` +### Exclusive && Failover +#### At-least-once +For `Exclusive` and `Failover` modes, which follow `At least once` semantics, it's crucial to focus on maintaining +the order of messages while ensuring none are lost. Users are recommended to use cumulative acknowledgment in the +`Exclusive` or `Failover` mode. Pulsar guarantees that the user has received all messages prior to a message that will +be cumulative acknowledged. In this mode, there will be no ack-hole and there is no need to redeliver a specific message. +It is also not recommended to redeliver a specific message as it can cause messages to be out of order. When there is a +problem with the processing of a batch of messages, it is recommended to use `redeliverUnacknowledgedMessages` to +redeliver all unprocessed messages to ensure the orderliness of the messages. +````java + Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName(sub) + .subscribe(); + Message<Integer> message = consumer.receive(); + try { + // process message + message = consumer.receive(); + // process message + // ...... + consumer.acknowledgeCumulative(message); + } catch (Exception e) { + consumer.redeliverUnacknowledgedMessages(); + } finally { + message.release(); + } +```` + +#### Exactly-once (Beta) +In the `Exclusive` and `Failover` mode, the most troublesome issue for users is the problem of duplicate messages caused +by disconnection. When a connection is reset, the broker will resend all the unacknowledged messages to the consumer. +The consumer may have processed many messages without acknowledging them, causing the consumer to unconsciously resume +these messages repeatedly. Unfortunately, there is currently no effective way to prevent this situation from happening. +Pulsar transaction is not yet sufficient to solve this problem. + + +`Long-term solution`: Apply epoch to avoid receiving repeated messages and abort transaction in the extreme case +(consumer change frequently in the failover subscription type). + +In conclusion, ensuring the reliable transmission of messages in Pulsar involves understanding the different subscription +types, acknowledgment methods, and message redelivery mechanisms. Different scenarios require different best practices, +and users should consider factors such as the order of partition messages, the possibility of duplicate messages, +and the speed of the business system. The solutions provided here, including configuring `deadLetterPolicy` and `ackTimeout` +, using transactions, and modifying the ack-timeout mechanism, can help users to address common issues and optimize their use of Pulsar. \ No newline at end of file diff --git a/best-pratice-blogs/static/img/blog-consume-best-practice/Ack-hole.png b/best-pratice-blogs/static/img/blog-consume-best-practice/Ack-hole.png new file mode 100644 index 0000000..eda0b9e Binary files /dev/null and b/best-pratice-blogs/static/img/blog-consume-best-practice/Ack-hole.png differ diff --git a/best-pratice-blogs/static/img/blog-consume-best-practice/AckTimeout.png b/best-pratice-blogs/static/img/blog-consume-best-practice/AckTimeout.png new file mode 100644 index 0000000..1a04ee9 Binary files /dev/null and b/best-pratice-blogs/static/img/blog-consume-best-practice/AckTimeout.png differ diff --git a/best-pratice-blogs/static/img/blog-consume-best-practice/DLQ.png b/best-pratice-blogs/static/img/blog-consume-best-practice/DLQ.png new file mode 100644 index 0000000..373621f Binary files /dev/null and b/best-pratice-blogs/static/img/blog-consume-best-practice/DLQ.png differ diff --git a/best-pratice-blogs/static/img/blog-consume-best-practice/acknowledgement-types.png b/best-pratice-blogs/static/img/blog-consume-best-practice/acknowledgement-types.png new file mode 100644 index 0000000..ebf3d33 Binary files /dev/null and b/best-pratice-blogs/static/img/blog-consume-best-practice/acknowledgement-types.png differ diff --git a/best-pratice-blogs/static/img/blog-consume-best-practice/cumulative-ack-problem.png b/best-pratice-blogs/static/img/blog-consume-best-practice/cumulative-ack-problem.png new file mode 100644 index 0000000..7bb3033 Binary files /dev/null and b/best-pratice-blogs/static/img/blog-consume-best-practice/cumulative-ack-problem.png differ diff --git a/best-pratice-blogs/static/img/blog-consume-best-practice/subscription-types.png b/best-pratice-blogs/static/img/blog-consume-best-practice/subscription-types.png new file mode 100644 index 0000000..52aa79c Binary files /dev/null and b/best-pratice-blogs/static/img/blog-consume-best-practice/subscription-types.png differ diff --git a/build/quickstarts-showcase/pom.xml b/build/quickstarts-showcase/pom.xml index 64e69f2..6981cba 100644 --- a/build/quickstarts-showcase/pom.xml +++ b/build/quickstarts-showcase/pom.xml @@ -4,7 +4,7 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.optaplanner</groupId> - <artifactId>optaplanner-quickstarts-parent</artifactId> + <artifactId>pulsar-java-contrib</artifactId> <version>9.44.0.Final</version> <relativePath>../../pom.xml</relativePath> </parent> diff --git a/conrtibutionGuides.md b/conrtibutionGuides.md new file mode 100644 index 0000000..54df75c --- /dev/null +++ b/conrtibutionGuides.md @@ -0,0 +1,100 @@ +# Apache Pulsar Plugin Contribution Guide + +## Welcome +Thank you for your contribution to the Apache Pulsar project. This document will guide you to understand and implement Pulsar's extensible interface. + +## 1. Introduction +Apache Pulsar is a distributed messaging and streaming platform. You can view detailed information [here](pulsar.apache.org/). +Pulsar allows users to customize plugins and integrate them into Pulsar to meet customized needs by exposing interfaces and configurations. +Many of these plugins are similar and can be reused. This project aims to collect and organize the implementation of various plugins, reduce the development cost caused by repeated implementations in the community, and lower the threshold for using Pulsar. + +## 2. Core interface list +List the core interfaces in Pulsar that can be implemented by contributors. + +### 2.1 Pulsar client extension interface +- `org.apache.pulsar.client.api.MessageListenerExecutor.java` + +### 2.2 Authentication and authorization related interfaces +- `org.apache.pulsar.broker.authorization.AuthorizationProvider` +- `org.apache.pulsar.client.api.AuthenticationDataProvider` +- `org.apache.pulsar.functions.auth.FunctionAuthProvider` +- `org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider` +- `org.apache.pulsar.functions.secretsprovider.SecretsProvider` + +### 2.3 Transaction-related interfaces +- `org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider` +- `org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider` +- `org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider` + +### 2.4 Load Balancer Extension Interface +- `org.apache.pulsar.broker.loadbalance.ModularLoadManager` +- `org.apache.pulsar.common.naming.TopicBundleAssignmentStrategy` +- `org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy` +- `org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy` +- `org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter` + +### 2.5 Interceptor Interface +- `org.apache.pulsar.client.api.ProducerInterceptor` +- `org.apache.pulsar.client.api.ConsumerInterceptor` +- `org.apache.pulsar.client.api.ReaderInterceptor` +- `org.apache.pulsar.broker.intercept.BrokerInterceptor` +- `org.apache.pulsar.broker.service.TopicEventsListener` +- `org.apache.pulsar.client.api.ConsumerEventListener` +- `org.apache.pulsar.client.impl.transaction.TransactionBufferHandler` + +### 2.6 Pulsar Connector Interface +- `org.apache.pulsar.io.core.Sink` +- `org.apache.pulsar.io.core.Source` + +### 2.7 Pulsar Function Interface +- `org.apache.pulsar.functions.api.Function` + +### 2.8 Bookkeeper related interfaces +- `org.apache.pulsar.broker.service.schema.SchemaStorageFactory` +- `org.apache.pulsar.packages.management.core.PackagesStorageProvider` +- `org.apache.bookkeeper.mledger.ManagedLedger` + +### 2.9 Metrics related interfaces +- `org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider` + +## 3. Interface Implementation Guide +Provide implementation guide for each type of interfaces. + +### 3.1 Client extension interface implementation +- `org.apache.pulsar.client.api.MessageListenerExecutor.java` +- **Purpose**: Select different message processing threads according to business scenarios. +- **Sample code**: org.apache.pulsar.client.api.impl.KeySharedMessageListenerExecutor, org.apache.pulsar.client.api.impl.CommonMessageListenerExecutor, org.apache.pulsar.client.api.impl.PartitionOrderMessageListenerExecutor + +### 3.2 Authentication and authorization related interfaces +- **Purpose**: Currently Pulsar has only a few default implementations for authentication and authorization interfaces. Users can customize the required authentication and authorization implementation through this interface. +- **Sample code**: https://github.com/apache/pulsar/tree/master/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization, https://github.com/apache/pulsar/tree/master/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication + +### 3.3 Implementation of transaction-related interfaces +- **Purpose**: Customize transaction components according to different business requirements. For example, the Transaction Buffer implemented based on the Exactly-once requirement may have different considerations and different implementations. +- **Sample code**: https://github.com/apache/pulsar/tree/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl, https://github.com/apache/pulsar/tree/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack, https://github.com/apache/pulsar/tree/master/pul sar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator +### 3.4 Load balancer extension interface implementation +- **Purpose**: According to the business scenario of the user, when the existing load balance strategy cannot meet the business needs or is not the best strategy, you can inherit the existing strategy and modify it or completely customize the load balancer strategy suitable for your business. +- **Sample code**: See the official existing implementation.https://github.com/apache/pulsar/tree/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance + +### 3.5 Interceptor extension interface implementation +- **Purpose**: Users can implement Pulsar's interceptor interface to perform logging and auditing, message conversion and filtering. These interceptors often have similar implementations and can be abstracted and reused. For example, a series of interceptors for logging can be fully reused. +- **Sample code**: None + +### 3.6 Connector interface implementation +- **Purpose**: Connect Pulsar with external systems to import and export data. +- **Sample code**: https://github.com/apache/pulsar/tree/master/pulsar-io + +### 3.7 Pulsar function interface implementation +- **Purpose**: Implement serverless computing logic to respond to data flow changes in Pulsar. +- **Sample code**: https://github.com/apache/pulsar/tree/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples + +### 3.8 Bookkeeper related interface implementation +- **Purpose**: Processing logic related to persistent data +- **Sample code**: https://github.com/apache/pulsar/tree/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl, https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java, + +### 3.9 Metrics related interface +- **Purpose**: Used to customize the generation of metric data in the Prometheus monitoring system +- **Sample code**: +```java +PrometheusRawMetricsProvider rawMetricsProvider = stream -> stream.write("test_metrics{label1=\"xyz\"} 10 \n"); +``` \ No newline at end of file diff --git a/contributedFeatures.md b/contributedFeatures.md new file mode 100644 index 0000000..63e44ba --- /dev/null +++ b/contributedFeatures.md @@ -0,0 +1,23 @@ +# List of implemented plugins + +## Client extension interface implementation +- `org.apache.pulsar.client.api.MessageListenerExecutor.java` + - org.apache.pulsar.client.api.impl.KeySharedMessageListenerExecutor + - org.apache.pulsar.client.api.impl.CommonMessageListenerExecutor + - org.apache.pulsar.client.api.impl.PartitionOrderMessageListenerExecutor + +## Load balancer extension interface implementation + +## Interceptor extension interface implementation + +## Connector interface implementation + +## Pulsar function interface implementation + +## Bookkeeper related interface implementation + +## Transaction related interface implementation + +## Metrics related interface + +## Authentication and authorization related interface \ No newline at end of file diff --git a/customizationFeatures.md b/customizationFeatures.md new file mode 100644 index 0000000..ded6f0a --- /dev/null +++ b/customizationFeatures.md @@ -0,0 +1,11 @@ +# Customized features in personal projects + +**Statement**: The following projects are all from the contributors' personal code repositories and are for reference only. + +**Example**: + +## Unitization solution +### Introduction +This project customizes Pulsar's dispatcher so that it can distribute specific messages to clients from specific regions based on topic policies. +### Project address +https://github.com/xxx/pulsar-xxx/pull/xxx/commits/xxx \ No newline at end of file diff --git a/openProvider.md b/openProvider.md new file mode 100644 index 0000000..d8fb813 --- /dev/null +++ b/openProvider.md @@ -0,0 +1,34 @@ +我们可以根据Pulsar + + +Transaction + + +• PackagesStorageProvider java +TransactionTimeoutTrackerFactory.java +• BrokerInterceptor java +• ModularLoadManagerStrategy java +• BrokerFilter java +Protocol Handler java +• Dispatcher java +• TopicEvents Listener. java +• SchemaDataValidator java +AuthorizationProvider java +• Prometheus RawMetrics Provider java +• ModularLoadManager.java +• AuthenticationDataProvider,java +• Kubernetes FunctionAuthProvider java +© BookKeeperClientFactoryImpl.java +• ConsumerEventListener.java +ProducerInterceptor.java +• StateStoreProvider,java +• FunctionAuthProvider.java +• Secrets Provider java +• Secrets ProviderConfigurator java +TransactionLog java • +‹> findbugs Exclude.xml +BookKeeperClientFactory java +• TopicCompactionStrategy java +• MetadataStoreProvider java +• TransactionMetadataStoreProvider java +• TransactionBufferHandler.java \ No newline at end of file diff --git a/pom.xml b/pom.xml index 06e7379..d107fe2 100644 --- a/pom.xml +++ b/pom.xml @@ -10,25 +10,20 @@ </parent> <!-- IMPORTANT: the individual quickstarts have no parent pom. --> - <artifactId>optaplanner-quickstarts-parent</artifactId> + <artifactId>pulsar-java-contrib</artifactId> <packaging>pom</packaging> - <name>OptaPlanner Quickstarts</name> + <name>Pulsar Java Contrib</name> <modules> - <module>hello-world</module> - <module>pulsar-client-contrib</module> - <module>technology/java-spring-boot</module> - <module>technology/java-activemq-quarkus</module> - <module>technology/kotlin-quarkus</module> - <module>use-cases/school-timetabling</module> - <module>use-cases/facility-location</module> - <module>use-cases/maintenance-scheduling</module> - <module>use-cases/call-center</module> - <module>use-cases/vaccination-scheduling</module> - <module>use-cases/vehicle-routing</module> - <module>use-cases/order-picking</module> - <module>use-cases/employee-scheduling</module> - <module>build/quickstarts-showcase</module> + <module>pulsar-client-contrib</module> + <module>pulsar-loadbalance-contrib</module> + <module>pulsar-interceptor-contrib</module> + <module>pulsar-connector-contrib</module> + <module>pulsar-function-contrib</module> + <module>pulsar-bookkeeper-contrib</module> + <module>pulsar-transaction-contrib</module> + <module>pulsar-metrics-contrib</module> + <module>pulsar-auth-contrib</module> </modules> <profiles> @@ -39,9 +34,6 @@ <name>!productized</name> </property> </activation> - <modules> - <module>technology/kubernetes</module> - </modules> </profile> <profile> @@ -51,9 +43,6 @@ <name>full</name> </property> </activation> - <modules> - <module>build/optaplanner-distribution</module> - </modules> </profile> </profiles> diff --git a/pulsar-auth-contrib/pom.xml b/pulsar-auth-contrib/pom.xml new file mode 100644 index 0000000..252e4b0 --- /dev/null +++ b/pulsar-auth-contrib/pom.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.optaplanner</groupId> + <artifactId>pulsar-java-contrib</artifactId> + <version>9.44.0.Final</version> + </parent> + + <artifactId>pulsar-auth-contrib</artifactId> + + <properties> + <maven.compiler.source>21</maven.compiler.source> + <maven.compiler.target>21</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + +</project> \ No newline at end of file diff --git a/pulsar-bookkeeper-contrib/pom.xml b/pulsar-bookkeeper-contrib/pom.xml new file mode 100644 index 0000000..f7b94ca --- /dev/null +++ b/pulsar-bookkeeper-contrib/pom.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.optaplanner</groupId> + <artifactId>pulsar-java-contrib</artifactId> + <version>9.44.0.Final</version> + </parent> + + <artifactId>pulsar-bookkeeper-contrib</artifactId> + + <properties> + <maven.compiler.source>21</maven.compiler.source> + <maven.compiler.target>21</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + +</project> \ No newline at end of file diff --git a/pulsar-client-contrib/README.md b/pulsar-client-contrib/README.md deleted file mode 100644 index b7ed14e..0000000 --- a/pulsar-client-contrib/README.md +++ /dev/null @@ -1,37 +0,0 @@ -# Apache Pulsar Java Contrib - -[](https://github.com/StevenLuMT/pulsar-java-contrib/actions/workflows/build.yml) - -pulsar-java-contrib is similar to the positioning of [opentelemetry-java-contrib](https://github.com/open-telemetry/opentelemetry-java-contrib): -* One is to prevent the Pulsar main repository from adding too many unnecessary functions, which would make Pulsar too bloated, increasing the user's usage cost and our maintenance cost. -* The other is to reduce the user's usage cost by implementing a ready-to-use implementation class based on Pulsar's external interface. - -these ways achieve these two goals are the plugin library and the yellow pages. If you need an easier way to implement some plugin library based on Pulsar that cannot be easily satisfied by importing Pulsar directly, then this project is hopefully for you. - -## Provided Libraries - - -* [Pulsar Client Contrib](./pulsar-client-contrib/README.md) - -## Getting Started - -```bash - -``` - -## Contributing - -pulsar-java-contrib is actively in development. If you have an idea for a similar use case in the metrics, traces, or logging -domain we would be very interested in supporting it. Please -[open an issue](https://github.com/StevenLuMT/pulsar-java-contrib/issues/new/choose) to share your idea or -suggestion. PRs are always welcome and greatly appreciated, but for larger functional changes a pre-coding introduction -can be helpful to ensure this is the correct place and that active or conflicting efforts don't exist. - -Emeritus maintainers: -- [Steven Lu](https://github.com/StevenLuMT) -- [Liangyepian Zhou](https://github.com/liangyepianzhou) -- [Linlin Duan](https://github.com/AuroraTwinkle) -- [Fenggan Cai](https://github.com/cai152) -- [Jia Zhai](https://github.com/jiazhai) - -Learn more about roles in the [community repository](https://github.com/StevenLuMT/pulsar-java-contrib). diff --git a/pulsar-connector-contrib/pom.xml b/pulsar-connector-contrib/pom.xml new file mode 100644 index 0000000..755edc2 --- /dev/null +++ b/pulsar-connector-contrib/pom.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.optaplanner</groupId> + <artifactId>pulsar-java-contrib</artifactId> + <version>9.44.0.Final</version> + </parent> + + <artifactId>pulsar-connector-contrib</artifactId> + + <properties> + <maven.compiler.source>21</maven.compiler.source> + <maven.compiler.target>21</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + +</project> \ No newline at end of file diff --git a/pulsar-function-contrib/pom.xml b/pulsar-function-contrib/pom.xml new file mode 100644 index 0000000..ebf242b --- /dev/null +++ b/pulsar-function-contrib/pom.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.optaplanner</groupId> + <artifactId>pulsar-java-contrib</artifactId> + <version>9.44.0.Final</version> + </parent> + + <artifactId>pulsar-function-contrib</artifactId> + + <properties> + <maven.compiler.source>21</maven.compiler.source> + <maven.compiler.target>21</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + +</project> \ No newline at end of file diff --git a/pulsar-interceptor-contrib/pom.xml b/pulsar-interceptor-contrib/pom.xml new file mode 100644 index 0000000..4020f01 --- /dev/null +++ b/pulsar-interceptor-contrib/pom.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.optaplanner</groupId> + <artifactId>pulsar-java-contrib</artifactId> + <version>9.44.0.Final</version> + </parent> + + <artifactId>pulsar-interceptor-contrib</artifactId> + + <properties> + <maven.compiler.source>21</maven.compiler.source> + <maven.compiler.target>21</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + +</project> \ No newline at end of file diff --git a/pulsar-loadbalance-contrib/pom.xml b/pulsar-loadbalance-contrib/pom.xml new file mode 100644 index 0000000..c00016e --- /dev/null +++ b/pulsar-loadbalance-contrib/pom.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.optaplanner</groupId> + <artifactId>pulsar-java-contrib</artifactId> + <version>9.44.0.Final</version> + </parent> + + <artifactId>pulsar-loadbalance-contrib</artifactId> + + <properties> + <maven.compiler.source>21</maven.compiler.source> + <maven.compiler.target>21</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + +</project> \ No newline at end of file diff --git a/pulsar-metrics-contrib/pom.xml b/pulsar-metrics-contrib/pom.xml new file mode 100644 index 0000000..dbf2539 --- /dev/null +++ b/pulsar-metrics-contrib/pom.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.optaplanner</groupId> + <artifactId>pulsar-java-contrib</artifactId> + <version>9.44.0.Final</version> + </parent> + + <artifactId>pulsar-metrics-contrib</artifactId> + + <properties> + <maven.compiler.source>21</maven.compiler.source> + <maven.compiler.target>21</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + +</project> \ No newline at end of file diff --git a/pulsar-transaction-contrib/pom.xml b/pulsar-transaction-contrib/pom.xml new file mode 100644 index 0000000..bbe725b --- /dev/null +++ b/pulsar-transaction-contrib/pom.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.optaplanner</groupId> + <artifactId>pulsar-java-contrib</artifactId> + <version>9.44.0.Final</version> + </parent> + + <artifactId>pulsar-transaction-contrib</artifactId> + + <properties> + <maven.compiler.source>21</maven.compiler.source> + <maven.compiler.target>21</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + +</project> \ No newline at end of file diff --git a/runQuickstartsFromSource.bat b/runQuickstartsFromSource.bat deleted file mode 100644 index 8cbf74f..0000000 --- a/runQuickstartsFromSource.bat +++ /dev/null @@ -1,8 +0,0 @@ -@ECHO OFF -setLocal enableExtensions enableDelayedExpansion - -cd build -mvnw -f .. verify -DskipTests && cd build && ^ -mvnw -f quickstarts-showcase quarkus:dev -Dstartup-open-browser=true -cd .. -cd .. diff --git a/runQuickstartsFromSource.sh b/runQuickstartsFromSource.sh deleted file mode 100755 index c572252..0000000 --- a/runQuickstartsFromSource.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -# Change directory to the directory of the script -cd "$(dirname $0)" || exit - -cd build -./mvnw -f .. verify -DskipTests && - ./mvnw -f quickstarts-showcase quarkus:dev -Dstartup-open-browser=true -cd ..
