This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 57e9d39 Reader interface documentation (#1096) 57e9d39 is described below commit 57e9d397c0d35b315bfcd2c71360e257a4739778 Author: Luc Perkins <lucperk...@gmail.com> AuthorDate: Fri Jan 26 11:25:09 2018 -0800 Reader interface documentation (#1096) * add basic description in concepts/architecture doc * begin adding section to java API doc * finish draft of java API section * finish draft of concepts/architecture section * add python example --- site/docs/latest/clients/Java.md | 40 +++++++++++++---- site/docs/latest/clients/Python.md | 16 +++++++ .../getting-started/ConceptsAndArchitecture.md | 51 ++++++++++++++++++++++ 3 files changed, 99 insertions(+), 8 deletions(-) diff --git a/site/docs/latest/clients/Java.md b/site/docs/latest/clients/Java.md index 45d55a9..f0fd9b0 100644 --- a/site/docs/latest/clients/Java.md +++ b/site/docs/latest/clients/Java.md @@ -215,6 +215,30 @@ CompletableFuture<Message> asyncMessage = consumer.receiveAsync(); Async receive operations return a {% javadoc Message client org.apache.pulsar.client.api.Message %} wrapped in a [`CompletableFuture`](http://www.baeldung.com/java-completablefuture). +## Reader interface + +The Pulsar [Reader API](../../getting-started/ConceptsAndArchitecture#reader-interface) enables applications to access messages on Pulsar {% popover topics %} + +With the Reader API, Pulsar clients can "manually position" themselves within a topic, reading all messages from a specified message onward. The Pulsar API for Java enables you to create {% javadoc Reader client org.apache.pulsar.client.api.Reader %} objects by specifying a {% popover topic %}, a {% javadoc MessageId client org.apache.pulsar.client.api.MessageId %}, and {% javadoc ReaderConfiguration client org.apache.pulsar.client.api.ReaderConfiguration %}. + +Here's an example: + +```java +ReaderConfiguration conf = new ReaderConfiguration(); +byte[] msgIdBytes = // Some message ID byte array +MessageId id = MessageId.fromByteArray(msgIdBytes); +Reader reader = pulsarClient.createReader(topic, id, conf); + +while (true) { + Message message = reader.readNext(); + // Process message +} +``` + +In the example above, a `Reader` object is instantiated for a specific topic and message (by ID); the reader then iterates over each message in the topic after the message identified by `msgIdBytes` (how that value is obtained depends on the application). + +The code sample above shows pointing the `Reader` object to a specific message (by ID), but you can also use `MessageId.earliest` to point to the earliest available message on the topic of `MessageId.latest` to point to the most recent available message. + ## Authentication Pulsar currently supports two authentication schemes: [TLS](../../admin/Authz#tls-client-auth) and [Athenz](../../admin/Authz#athenz). The Pulsar Java client can be used with both. @@ -235,8 +259,7 @@ authParams.put("tlsCertFile", "/path/to/client-cert.pem"); authParams.put("tlsKeyFile", "/path/to/client-key.pem"); conf.setAuthentication(AuthenticationTls.class.getName(), authParams); -PulsarClient client = PulsarClient.create( - "pulsar+ssl://my-broker.com:6651", conf); +PulsarClient client = PulsarClient.create("pulsar+ssl://my-broker.com:6651", conf); ``` ### Athenz @@ -270,10 +293,11 @@ PulsarClient client = PulsarClient.create( "pulsar+ssl://my-broker.com:6651", conf); ``` -**Note**: *`privateKey` parameter supports following three patterns format*. +{% include admonition.html type="info" title="Supported pattern formats" +content=' +The `privateKey` parameter supports the following three pattern formats: + +* `file:///path/to/file` +* `file:/path/to/file` +* `data:application/x-pem-file;base64,<base64-encoded value>`' %} -``` -file:///path/to/file -file:/path/to/file -data:application/x-pem-file;base64,<base64-encoded value> -``` diff --git a/site/docs/latest/clients/Python.md b/site/docs/latest/clients/Python.md index 9b36358..8cad7c4 100644 --- a/site/docs/latest/clients/Python.md +++ b/site/docs/latest/clients/Python.md @@ -103,3 +103,19 @@ while True: client.close() ``` + +### Reader interface example + +You can use the Pulsar Python API to use the Pulsar [reader interface](../../getting-started/ConceptsAndArchitecture#reader-interface). Here's an example: + +```python +# MessageId taken from a previously fetched message +msg_id = msg.message_id() + +reader = client.create_reader(TOPIC, msg_id) + +while True: + msg = reader.receive() + print("Received message '%s' id='%s'", msg.data(), msg.message_id()) + # No acknowledgment +``` \ No newline at end of file diff --git a/site/docs/latest/getting-started/ConceptsAndArchitecture.md b/site/docs/latest/getting-started/ConceptsAndArchitecture.md index 1958af8..22a0440 100644 --- a/site/docs/latest/getting-started/ConceptsAndArchitecture.md +++ b/site/docs/latest/getting-started/ConceptsAndArchitecture.md @@ -110,6 +110,7 @@ As in other pub-sub systems, topics in Pulsar are named channels for transmittin content="Application does not explicitly create the topic but attempting to write or receive message on a topic that does not yet exist, Pulsar will automatically create that topic under the [namespace](#namespace)." %} ### Namespace + A namespace is a logical nomenclature within a property. A property can create multiple namespaces via [admin API](../../admin-api/namespaces#create). For instance, a property with different applications can create a separate namespace for each application. A namespace allows the application to create and manage a hierarchy of topics. For e.g. `my-property/my-cluster/my-property-app1` is a namespace for the application `my-property-app1` in cluster `my-cluster` for `my-property`. Application can create any number of [topics](#topics) under the namespace. @@ -311,3 +312,53 @@ Whenever the TCP connection breaks, the client will immediately re-initiate this [Clients](../../getting-started/Clients) connecting to Pulsar {% popover brokers %} need to be able to communicate with an entire Pulsar {% popover instance %} using a single URL. Pulsar provides a built-in service discovery mechanism that you can set up using the instructions in the [Deploying a Pulsar instance](../../deployment/InstanceSetup#service-discovery-setup) guide. You can use your own service discovery system if you'd like. If you use your own system, there is just one requirement: when a client performs an HTTP request to an endpoint, such as `http://pulsar.us-west.example.com:8080`, the client needs to be redirected to *some* active broker in the desired {% popover cluster %}, whether via DNS, an HTTP or IP redirect, or some other means. + +## Reader interface + +In Pulsar, the "standard" [consumer interface](#consumers) involves using {% popover consumers %} to listen on {% popover topics %}, process incoming messages, and finally {% popover acknowledge %} those messages when they've been processed. Whenever a consumer disconnects from and then reconnects to a topic, it automatically begins reading from the earliest un-acked message onward because the topic's cursor is automatically managed by Pulsar. + +The **reader interface** for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic---rather than a consumer---you need to specify *which* message the reader begins reading from. When specifying that initial message, the reader interface gives you three options: + +* The **earliest** available message in the topic +* The **latest** available message in the topic +* Some other message between the earliest and the latest. If you select this option, you'll need to explicitly provide a message ID. Your application will be responsible for "knowing" this message ID in advance, perhaps fetching it from a persistent data store or cache. + +The reader interface is helpful for use cases like using Pulsar to provide [effectively-once](https://streaml.io/blog/exactly-once/) processing semantics for a stream processing system. For this use case, it's essential that the stream processing system be able to "rewind" topics to a specific message and begin reading there. The reader interface provides Pulsar clients with the low-level abstraction necessary to "manually position" themselves within a topic. + +{% include admonition.html type="warning" title="Non-partitioned topics only" +content="The reader interface for Pulsar cannot currently be used with [partitioned topics](#partitioned-topics)." %} + +Here's a Java example that begins reading from the earliest available message on a topic: + +```java +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Reader; + +String topic = "persistent://sample/standalone/ns1/reader-api-test"; +MessageId id = MessageId.earliest; + +// Create a reader on a topic and for a specific message (and onward) +Reader reader = pulsarClient.createReader(topic, id, new ReaderConfiguration()); + +while (true) { + Message message = reader.readNext(); + + // Process the message +} +``` + +To create a reader that will read from the latest available message: + +```java +MessageId id = MessageId.latest; +Reader reader = pulsarClient.createReader(topic, id, new ReaderConfiguration()); +``` + +To create a reader that will read from some message between earliest and latest: + +```java +byte[] msgIdBytes = // Some byte array +MessageId id = MessageId.fromByteArray(msgIdBytes); +Reader reader = pulsarClient.createReader(topic, id, new ReaderConfiguration()); +``` \ No newline at end of file -- To stop receiving notification emails like this one, please contact mme...@apache.org.