This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d0d176  Add javadoc to ReactiveMessageSenderBuilder (#28)
8d0d176 is described below

commit 8d0d1764e79f5ba9f56b6654d1eb6f426b938586
Author: Christophe Bornet <[email protected]>
AuthorDate: Mon Nov 28 10:20:01 2022 +0100

    Add javadoc to ReactiveMessageSenderBuilder (#28)
---
 .../client/api/ReactiveMessageSenderBuilder.java   | 399 +++++++++++++++++++++
 1 file changed, 399 insertions(+)

diff --git 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderBuilder.java
 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderBuilder.java
index b4ddc52..c690c68 100644
--- 
a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderBuilder.java
+++ 
b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessageSenderBuilder.java
@@ -28,74 +28,267 @@ import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.reactivestreams.Publisher;
 
+/**
+ * Builder interface for {@link ReactiveMessageSender}.
+ *
+ * @param <T> the message payload type
+ * @author Lari Hotari
+ * @author Christophe Bornet
+ */
 public interface ReactiveMessageSenderBuilder<T> {
 
+       /**
+        * Sets the cache to use for the sender.
+        * @param producerCache the cache to set
+        * @return the sender builder instance
+        */
        ReactiveMessageSenderBuilder<T> cache(ReactiveMessageSenderCache 
producerCache);
 
+       /**
+        * Sets the maximum number of in-flight messages for the sender. When 
this value is
+        * reached, backpressure will be triggered on the
+        * {@link ReactiveMessageSender#sendOne(MessageSpec)}/{@link 
ReactiveMessageSender#sendMany(Publisher)}
+        * operations. Note that the maxInflight setting applies globally for 
all the
+        * operations called on the sender.
+        * @param maxInflight the maximum number of in-flight messages for the 
sender.
+        * @return the sender builder instance
+        */
        ReactiveMessageSenderBuilder<T> maxInflight(int maxInflight);
 
+       /**
+        * Sets the maximum number of concurrent subscriptions for the sender. 
Note that the
+        * maxConcurrentSenderSubscriptions setting applies globally for all 
the operations
+        * called on the sender.
+        * @param maxConcurrentSenderSubscriptions the maximum number of 
concurrent
+        * subscriptions for the sender.
+        * @return the sender builder instance
+        */
        ReactiveMessageSenderBuilder<T> maxConcurrentSenderSubscriptions(int 
maxConcurrentSenderSubscriptions);
 
+       /**
+        * Applies a sender spec to configure the sender.
+        * @param senderSpec the sender spec to apply
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> 
applySpec(ReactiveMessageSenderSpec senderSpec) {
                getMutableSpec().applySpec(senderSpec);
                return this;
        }
 
+       /**
+        * Converts this builder to an immutable reactive sender spec with the 
same settings.
+        * @return the reactive sender spec.
+        */
        ReactiveMessageSenderSpec toImmutableSpec();
 
+       /**
+        * Converts this builder to a mutable reactive sender spec with the 
same settings.
+        * @return the reactive sender spec.
+        */
        MutableReactiveMessageSenderSpec getMutableSpec();
 
+       /**
+        * Sets the topic this sender will be publishing on.
+        *
+        * <p>
+        * This argument is required when constructing the sender.
+        * @param topicName the name of the topic
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> topic(String topicName) {
                getMutableSpec().setTopicName(topicName);
                return this;
        }
 
+       /**
+        * Sets a name for the producers created under this sender.
+        *
+        * <p>
+        * If not assigned, the system will generate a globally unique name to 
each producer.
+        *
+        * <p>
+        * <b>Warning</b>: When specifying a name, it is up to the user to 
ensure that, for a
+        * given topic, the producer name is unique across all Pulsar's 
clusters. Brokers will
+        * enforce that only a single producer given name can be publishing on 
a topic.
+        * @param producerName the name to use for the producer
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> producerName(String 
producerName) {
                getMutableSpec().setProducerName(producerName);
                return this;
        }
 
+       /**
+        * Sets the send timeout <i>(default: 30 seconds)</i> for this sender.
+        *
+        * <p>
+        * If a message is not acknowledged by the server before the 
sendTimeout expires, an
+        * error will be reported.
+        *
+        * <p>
+        * Setting the timeout to zero, for example {@code 
setTimeout(Duration.ZERO)} will set
+        * the timeout to infinity, which can be useful when using Pulsar's 
message
+        * deduplication feature, since the client library will retry forever 
to publish a
+        * message. No errors will be propagated back to the application.
+        * @param sendTimeout the send timeout to set
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> sendTimeout(Duration 
sendTimeout) {
                getMutableSpec().setSendTimeout(sendTimeout);
                return this;
        }
 
+       /**
+        * Sets the maximum size of the queues holding the messages pending to 
receive an
+        * acknowledgment from the broker. This setting applies to each 
producer created under
+        * this sender.
+        *
+        * <p>
+        * The producer queue size also determines the max amount of memory 
that will be
+        * required by the client application. Until the producer gets a 
successful
+        * acknowledgment back from the broker, it will keep in memory (direct 
memory pool)
+        * all the messages in the pending queue.
+        *
+        * <p>
+        * Default is 0, disable the pending messages check.
+        * @param maxPendingMessages the maximum size of the pending messages 
queue for the
+        * sender to set
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> maxPendingMessages(int 
maxPendingMessages) {
                getMutableSpec().setMaxPendingMessages(maxPendingMessages);
                return this;
        }
 
+       /**
+        * Sets the maximum number of pending messages across all the 
partitions. This setting
+        * applies to each producer created under this sender.
+        *
+        * <p>
+        * This setting will be used to lower the max pending messages for each 
partition
+        * ({@link #maxPendingMessages(int)}), if the total exceeds the 
configured value. The
+        * purpose of this setting is to have an upper-limit on the number of 
pending messages
+        * when publishing on a partitioned topic.
+        *
+        * <p>
+        * Default is 0, disable the pending messages across partitions check.
+        *
+        * <p>
+        * If publishing at high rate over a topic with many partitions 
(especially when
+        * publishing messages without a partitioning key), it might be 
beneficial to increase
+        * this parameter to allow for more pipelining within the individual 
partitions
+        * senders.
+        * @param maxPendingMessagesAcrossPartitions the maximum number of 
pending messages
+        * across all the partitions to set
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> 
maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
                
getMutableSpec().setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
                return this;
        }
 
+       /**
+        * Sets the routing mode for a partitioned producer. This setting 
applies to each
+        * producer created under this sender.
+        *
+        * <p>
+        * The default routing mode is to round-robin across the available 
partitions.
+        *
+        * <p>
+        * This logic is applied when the application is not setting a key on a 
particular
+        * message. If the key is set with {@link 
MessageSpecBuilder#key(String)}, then the
+        * hash of the key will be used to select a partition for the message.
+        * @param messageRoutingMode the message routing mode to set
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> 
messageRoutingMode(MessageRoutingMode messageRoutingMode) {
                getMutableSpec().setMessageRoutingMode(messageRoutingMode);
                return this;
        }
 
+       /**
+        * Sets the hashing scheme used to choose the partition on where to 
publish a
+        * particular message.
+        *
+        * <p>
+        * Standard hashing functions available are:
+        * <ul>
+        * <li>{@link HashingScheme#JavaStringHash}: Java {@code 
String.hashCode()} (Default)
+        * <li>{@link HashingScheme#Murmur3_32Hash}: Use Murmur3 hashing 
function. <a href=
+        * 
"https://en.wikipedia.org/wiki/MurmurHash";>https://en.wikipedia.org/wiki/MurmurHash</a>
+        * </ul>
+        * @param hashingScheme the hashing scheme to set
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> hashingScheme(HashingScheme 
hashingScheme) {
                getMutableSpec().setHashingScheme(hashingScheme);
                return this;
        }
 
+       /**
+        * Sets the action the sender will take in case of encryption failures.
+        * @param cryptoFailureAction the action the sender will take in case 
of encryption
+        * failures to set
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> 
cryptoFailureAction(ProducerCryptoFailureAction cryptoFailureAction) {
                getMutableSpec().setCryptoFailureAction(cryptoFailureAction);
                return this;
        }
 
+       /**
+        * Sets a custom message routing policy by passing an implementation of
+        * {@link MessageRouter}.
+        * @param messageRouter the message router to set
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> messageRouter(MessageRouter 
messageRouter) {
                getMutableSpec().setMessageRouter(messageRouter);
                return this;
        }
 
+       /**
+        * Sets the time period within which the messages sent will be batched 
<i>default: 1
+        * ms</i> if batched messages are enabled. If set to a non-zero value, 
messages will
+        * be queued until either:
+        * <ul>
+        * <li>this time interval expires</li>
+        * <li>the maximum number of messages in a batch is reached
+        * ({@link #batchingMaxMessages(int)})
+        * <li>the maximum size of a batch is reached ({@link 
#batchingMaxBytes(int)})
+        * </ul>
+        *
+        * <p>
+        * All messages will be published as a single batch message. The 
consumer will be
+        * delivered individual messages in the batch in the same order they 
were enqueued.
+        * @param batchingMaxPublishDelay the time period within which the 
messages sent will
+        * be batched
+        * @return the sender builder instance
+        * @see #batchingMaxMessages(int)
+        * @see #batchingMaxBytes(int)
+        */
        default ReactiveMessageSenderBuilder<T> 
batchingMaxPublishDelay(Duration batchingMaxPublishDelay) {
                
getMutableSpec().setBatchingMaxPublishDelay(batchingMaxPublishDelay);
                return this;
        }
 
+       /**
+        * Sets the partition switch frequency while batching of messages is 
enabled and using
+        * round-robin routing mode for non-keyed message <i>default: 10</i>.
+        *
+        * <p>
+        * The time period of partition switch is
+        * roundRobinRouterBatchingPartitionSwitchFrequency *
+        * {@link #batchingMaxPublishDelay}. During this period, all messages 
that arrive will
+        * be routed to the same partition.
+        * @param roundRobinRouterBatchingPartitionSwitchFrequency the 
frequency of partition
+        * switch
+        * @return the sender builder instance
+        * @see #messageRoutingMode(MessageRoutingMode)
+        * @see #batchingMaxPublishDelay(Duration)
+        */
        default ReactiveMessageSenderBuilder<T> 
roundRobinRouterBatchingPartitionSwitchFrequency(
                        int roundRobinRouterBatchingPartitionSwitchFrequency) {
                getMutableSpec()
@@ -103,81 +296,287 @@ public interface ReactiveMessageSenderBuilder<T> {
                return this;
        }
 
+       /**
+        * Sets the maximum number of messages permitted in a batch. 
<i>default: 1000</i> If
+        * set to a value greater than 1, messages will be batched until this 
threshold or the
+        * maximum byte size of a batch is reached or the batch publish delay 
has elapsed.
+        *
+        * <p>
+        * All messages in a batch will be published as a single batch message. 
The consumer
+        * will be delivered individual messages in the batch in the same order 
they were
+        * enqueued.
+        * @param batchingMaxMessages the maximum number of messages in a batch 
to set
+        * @return the sender builder instance
+        * @see #batchingMaxPublishDelay(Duration)
+        * @see #batchingMaxBytes(int)
+        */
        default ReactiveMessageSenderBuilder<T> batchingMaxMessages(int 
batchingMaxMessages) {
                getMutableSpec().setBatchingMaxMessages(batchingMaxMessages);
                return this;
        }
 
+       /**
+        * Sets the maximum number of bytes permitted in a batch. <i>default: 
128KB</i> If set
+        * to a value greater than 0, messages will be queued until this 
threshold is reached
+        * or other batching conditions are met.
+        *
+        * <p>
+        * All messages in a batch will be published as a single batched 
message. The consumer
+        * will be delivered individual messages in the batch in the same order 
they were
+        * enqueued.
+        * @param batchingMaxBytes the maximum number of bytes in a batch to set
+        * @return the sender builder instance
+        * @see #batchingMaxPublishDelay(Duration)
+        * @see #batchingMaxMessages(int)
+        */
        default ReactiveMessageSenderBuilder<T> batchingMaxBytes(int 
batchingMaxBytes) {
                getMutableSpec().setBatchingMaxBytes(batchingMaxBytes);
                return this;
        }
 
+       /**
+        * Sets whether batching of messages is enabled for the sender. 
<i>default:
+        * enabled</i>
+        *
+        * <p>
+        * When batching is enabled, multiple calls to
+        * {@link ReactiveMessageSender#sendOne(MessageSpec)}/{@link 
ReactiveMessageSender#sendMany(Publisher)}
+        * may result in a single batch to be sent to the broker, leading to 
better
+        * throughput, especially when publishing small messages. If 
compression is enabled,
+        * messages will be compressed at the batch level, leading to a much 
better
+        * compression ratio for similar headers or contents.
+        *
+        * <p>
+        * When enabled default batch delay is set to 1 ms and default batch 
size is 1000
+        * messages
+        *
+        * <p>
+        * Batching is enabled by default since 2.0.0.
+        * @param batchingEnabled whether batching is enabled
+        * @return the sender builder instance
+        * @see #batchingMaxPublishDelay(Duration)
+        * @see #batchingMaxMessages(int)
+        */
        default ReactiveMessageSenderBuilder<T> batchingEnabled(boolean 
batchingEnabled) {
                getMutableSpec().setBatchingEnabled(batchingEnabled);
                return this;
        }
 
+       /**
+        * Sets the batcher builder of the sender. The sender will use the 
batcher builder to
+        * build a batch message container.This is only used when batching is 
enabled.
+        * @param batcherBuilder the batcher builder to set
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> batcherBuilder(BatcherBuilder 
batcherBuilder) {
                getMutableSpec().setBatcherBuilder(batcherBuilder);
                return this;
        }
 
+       /**
+        * Sets whether chunking of messages is enabled If enabled, when the 
message size is
+        * higher than the maximum allowed publishing payload size on the 
broker, then the
+        * sender will split the message into multiple chunks and publish them 
to the broker
+        * separately and in order. The consumer will stitch them together to 
form the
+        * original published message.This allows clients to publish large 
messages.
+        *
+        * <p>
+        * Recommendations to use this feature:
+        *
+        * <pre>
+        * 1. Chunking is only supported by non-shared subscriptions and 
persistent-topic.
+        * 2. Batching shouldn't be used together with chunking.
+        * 3. The {@link ReactivePulsarClient} keeps published messages into a 
buffer until it receives the acknowledgements from the broker.
+        * So, it's better to reduce the {@link #maxPendingMessages} size to 
prevent the sender occupying large amount
+        *  of memory from these buffered messages.
+        * 4. Set the message TTL on the namespace to cleanup incomplete 
chunked messages.
+        * (sometimes, due to broker-restart or publish timeout, the sender 
might fail to publish an entire large message.
+        * So the consumer will not be able to consume and acknowledge those 
messages. So, those messages can
+        * only be discarded by message TTL) Or configure
+        * {@link 
ReactiveMessageConsumerBuilder#expireTimeOfIncompleteChunkedMessage}
+        * 5. Consumer configuration: the consumer should also configure {@link 
ReactiveMessageConsumerBuilder#receiverQueueSize} and {@link 
ReactiveMessageConsumerBuilder#maxPendingChunkedMessage}
+        * </pre>
+        * @param chunkingEnabled whether to enable chunking
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> chunkingEnabled(boolean 
chunkingEnabled) {
                getMutableSpec().setChunkingEnabled(chunkingEnabled);
                return this;
        }
 
+       /**
+        * Sets the key reader to be used to encrypt the message payloads.
+        * @param cryptoKeyReader the key reader to be used to encrypt the 
message payloads.
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> cryptoKeyReader(CryptoKeyReader 
cryptoKeyReader) {
                getMutableSpec().setCryptoKeyReader(cryptoKeyReader);
                return this;
        }
 
+       /**
+        * Sets the public encryption key names, used by the producer to 
encrypt the data key.
+        *
+        * <p>
+        * At the time of producer creation, the Pulsar client checks if there 
are keys added
+        * to encryptionKeys. If keys are found, a callback
+        * {@link CryptoKeyReader#getPrivateKey(String, Map)} and
+        * {@link CryptoKeyReader#getPublicKey(String, Map)} is invoked against 
each key to
+        * load the values of the key. Applications should implement this 
callback to return
+        * the key in pkcs8 format. If compression is enabled, the message is 
encrypted after
+        * compression. If batch messaging is enabled, the batched message is 
encrypted.
+        * @param encryptionKeys the names of the encryption keys in the key 
store
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> encryptionKeys(Set<String> 
encryptionKeys) {
                getMutableSpec().setEncryptionKeys(encryptionKeys);
                return this;
        }
 
+       /**
+        * Sets the compression type for the sender.
+        *
+        * <p>
+        * By default, message payloads are not compressed. Supported 
compression types are:
+        * <ul>
+        * <li>{@link CompressionType#NONE}: No compression (Default)</li>
+        * <li>{@link CompressionType#LZ4}: Compress with LZ4 algorithm. Faster 
but lower
+        * compression than ZLib</li>
+        * <li>{@link CompressionType#ZLIB}: Standard ZLib compression</li>
+        * <li>{@link CompressionType#ZSTD}: Compress with Zstandard codec. 
Since Pulsar 2.3.
+        * Zstd cannot be used if consumer applications are not in version >= 
2.3 as well</li>
+        * <li>{@link CompressionType#SNAPPY} Compress with Snappy codec. Since 
Pulsar 2.4.
+        * Snappy cannot be used if consumer applications are not in version >= 
2.4 as
+        * well</li>
+        * </ul>
+        * @param compressionType the compression type to set
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> compressionType(CompressionType 
compressionType) {
                getMutableSpec().setCompressionType(compressionType);
                return this;
        }
 
+       /**
+        * Sets the baseline for the sequence ids for messages published by the 
producer. This
+        * setting applies to each producer created under this sender.
+        *
+        * <p>
+        * First message will be using {@code (initialSequenceId + 1)} as its 
sequence id and
+        * subsequent messages will be assigned incremental sequence ids, if 
not otherwise
+        * specified.
+        * @param initialSequenceId the initial sequence id for the producer to 
set
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> initialSequenceId(long 
initialSequenceId) {
                getMutableSpec().setInitialSequenceId(initialSequenceId);
                return this;
        }
 
+       /**
+        * If enabled, the sender will automatically discover new partitions of 
partitioned
+        * topics at runtime.
+        *
+        * <p>
+        * Default is true.
+        * @param autoUpdatePartitions whether to auto discover the partition 
configuration
+        * changes
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> autoUpdatePartitions(boolean 
autoUpdatePartitions) {
                getMutableSpec().setAutoUpdatePartitions(autoUpdatePartitions);
                return this;
        }
 
+       /**
+        * Sets the interval of partitions updates <i>(default: 1 minute)</i>. 
This only works
+        * if {@link #autoUpdatePartitions} is enabled.
+        * @param autoUpdatePartitionsInterval the interval of partitions 
updates
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> 
autoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
                
getMutableSpec().setAutoUpdatePartitionsInterval(autoUpdatePartitionsInterval);
                return this;
        }
 
+       /**
+        * Sets whether to enable the multiple schema mode for the producer. If 
enabled, the
+        * producer can send a message with a different schema from the one 
specified when it
+        * was created, otherwise an invalid message exception would be thrown.
+        *
+        * <p>
+        * Enabled by default.
+        * @param multiSchema whether to enable or disable multiple schema mode
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> multiSchema(boolean 
multiSchema) {
                getMutableSpec().setMultiSchema(multiSchema);
                return this;
        }
 
+       /**
+        * Sets the type of access mode that the producer requires on the 
topic. This setting
+        * applies to each producer created under this sender.
+        *
+        * <p>
+        * Possible values are:
+        * <ul>
+        * <li>{@link ProducerAccessMode#Shared}: By default multiple producers 
can publish on
+        * a topic
+        * <li>{@link ProducerAccessMode#Exclusive}: Require exclusive access 
for producer.
+        * Fail immediately if there's already a producer connected.
+        * <li>{@link ProducerAccessMode#WaitForExclusive}: Producer creation 
is pending until
+        * it can acquire exclusive access
+        * </ul>
+        * @param accessMode the access mode to set
+        * @return the producer builder instance
+        */
        default ReactiveMessageSenderBuilder<T> accessMode(ProducerAccessMode 
accessMode) {
                getMutableSpec().setAccessMode(accessMode);
                return this;
        }
 
+       /**
+        * Sets whether to start partitioned producers lazily. This setting 
applies to each
+        * producer created under this sender. This config affects Shared mode 
producers of
+        * partitioned topics only. It controls whether producers register and 
connect
+        * immediately to the owner broker of each partition or start lazily on 
demand. The
+        * internal producer of one partition is always started eagerly, chosen 
by the routing
+        * policy, but the internal producers of any additional partitions are 
started on
+        * demand, upon receiving their first message. Using this mode can 
reduce the strain
+        * on brokers for topics with large numbers of partitions and when the 
SinglePartition
+        * or some custom partial partition routing policy like
+        * PartialRoundRobinMessageRouterImpl is used without keyed messages. 
Because producer
+        * connection can be on demand, this can produce extra send latency for 
the first
+        * messages of a given partition.
+        * @param lazyStartPartitionedProducers whether to start partition 
producers lazily
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> 
lazyStartPartitionedProducers(boolean lazyStartPartitionedProducers) {
                
getMutableSpec().setLazyStartPartitionedProducers(lazyStartPartitionedProducers);
                return this;
        }
 
+       /**
+        * Sets the properties to the producer. This setting applies to each 
producer created
+        * under this sender.
+        *
+        * <p>
+        * Properties are application defined metadata that can be attached to 
the sender.
+        * When getting the topic stats, this metadata will be associated to 
the sender stats
+        * for easier identification.
+        * @param properties the properties to set
+        * @return the sender builder instance
+        */
        default ReactiveMessageSenderBuilder<T> properties(Map<String, String> 
properties) {
                getMutableSpec().setProperties(properties);
                return this;
        }
 
+       /**
+        * Builds the reactive message sender.
+        * @return the reactive message sender
+        */
        ReactiveMessageSender<T> build();
 
 }

Reply via email to