This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 9fa01bb716d CAMEL-19444: fixed a few grammar errors on documentation
and code comments in camel-kafka (#10647)
9fa01bb716d is described below
commit 9fa01bb716d2391500c2b74a36c39ee51e08c18e
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Jul 11 06:25:51 2023 +0200
CAMEL-19444: fixed a few grammar errors on documentation and code comments
in camel-kafka (#10647)
---
.../camel-kafka/src/main/docs/kafka-component.adoc | 41 +++++++++++-----------
.../camel/component/kafka/KafkaComponent.java | 10 +++---
.../camel/component/kafka/KafkaConfiguration.java | 24 ++++++-------
.../consumer/errorhandler/BridgeErrorStrategy.java | 2 +-
.../errorhandler/DiscardErrorStrategy.java | 2 +-
.../KafkaConsumerAsyncManualCommitIT.java | 2 +-
.../integration/KafkaConsumerBatchSizeIT.java | 2 +-
7 files changed, 41 insertions(+), 42 deletions(-)
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 8405f58461a..0f49a0264ff 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -60,26 +60,25 @@
http://kafka.apache.org/documentation.html#producerconfigs[http://kafka.apache.o
include::partial$component-endpoint-headers.adoc[]
// component headers: END
-If you want to send a message to a dynamic topic then use
`KafkaConstants.OVERRIDE_TOPIC` as it is used as a one-time header
-that is not send along the message, as it is removed in the producer.
+If you want to send a message to a dynamic topic then use
`KafkaConstants.OVERRIDE_TOPIC` as it is used as a one-time header that is not
sent along the message, and actually is removed in the producer.
== Consumer error handling
While kafka consumer is polling messages from the kafka broker, then errors
can happen. This section describes what happens and what
you can configure.
-The consumer may throw exception when invoking the Kafka `poll` API. For
example, if the message cannot be de-serialized due invalid data,
-and many other kind of errors. Those errors are in the form of
`KafkaException` which are either _retryable_ or not. The exceptions
-which can be retried (`RetriableException`) will be retried again (with a poll
timeout in between). All other kind of exceptions are
+The consumer may throw exception when invoking the Kafka `poll` API. For
example, if the message cannot be deserialized due to invalid data,
+and many others kinds of errors. Those errors are in the form of
`KafkaException` which are either _retryable_ or not. The exceptions
+which can be retried (`RetriableException`) will be retried again (with a poll
timeout in between). All others kinds of exceptions are
handled according to the _pollOnError_ configuration. This configuration has
the following values:
-* DISCARD will discard the message and continue to poll next message.
-* ERROR_HANDLER will use Camel's error handler to process the exception, and
afterwards continue to poll next message.
-* RECONNECT will re-connect the consumer and try poll the message again.
+* DISCARD will discard the message and continue to poll the next message.
+* ERROR_HANDLER will use Camel's error handler to process the exception, and
afterwards continue to poll the next message.
+* RECONNECT will re-connect the consumer and try to poll the message again.
* RETRY will let the consumer retry polling the same message again
-* STOP will stop the consumer (have to be manually started/restarted if the
consumer should be able to consume messages again).
+* STOP will stop the consumer (it has to be manually started/restarted if the
consumer should be able to consume messages again).
-The default is *ERROR_HANDLER* which will let Camel's error handler (if any
configured) process the caused exception.
+The default is *ERROR_HANDLER*, which will let Camel's error handler (if any
configured) process the caused exception.
Afterwards continue to poll the next message. This behavior is similar to the
_bridgeErrorHandler_ option that
Camel components have.
@@ -102,7 +101,7 @@ from("kafka:test?brokers=localhost:9092")
.log(" with the key ${headers[kafka.KEY]}")
----
-If you need to consume messages from multiple topics you can use a comma
separated list of topic names.
+If you need to consume messages from multiple topics, you can use a comma
separated list of topic names.
[source,java]
----
@@ -126,8 +125,8 @@
from("kafka:test*?brokers=localhost:9092&topicIsPattern=true")
.log(" with the key ${headers[kafka.KEY]}")
----
-When consuming messages from Kafka you can use your own offset management and
not delegate this management to Kafka.
-In order to keep the offsets the component needs a `StateRepository`
implementation such as `FileStateRepository`.
+When consuming messages from Kafka, you can use your own offset management and
not delegate this management to Kafka.
+In order to keep the offsets, the component needs a `StateRepository`
implementation such as `FileStateRepository`.
This bean should be available in the registry.
Here how to use it :
@@ -228,7 +227,7 @@ camelContext.addRoutes(new RouteBuilder() {
The `camel-kafka` library provides a Kafka topic-based idempotent repository.
This repository stores broadcasts all changes to idempotent state (add/remove)
in a Kafka topic, and populates a local in-memory cache for each repository's
process instance through event sourcing.
The topic used must be unique per idempotent repository instance. The
mechanism does not have any requirements about the number of topic partitions;
as the repository consumes from all partitions at the same time. It also does
not have any requirements about the replication factor of the topic.
-Each repository instance that uses the topic (e.g. typically on different
machines running in parallel) controls its own consumer group, so in a cluster
of 10 Camel processes using the same topic each will control its own offset.
+Each repository instance that uses the topic (e.g., typically on different
machines running in parallel) controls its own consumer group, so in a cluster
of 10 Camel processes using the same topic, each will control its own offset.
On startup, the instance subscribes to the topic, rewinds the offset to the
beginning and rebuilds the cache to the latest state. The cache will not be
considered warmed up until one poll of `pollDurationMs` in length returns 0
records. Startup will not be completed until either the cache has warmed up, or
30 seconds go by; if the latter happens the idempotent repository may be in an
inconsistent state until its consumer catches up to the end of the topic.
Be mindful of the format of the header used for the uniqueness check. By
default, it uses Strings as the data types. When using primitive numeric
formats, the header must be deserialized accordingly. Check the samples below
for examples.
@@ -243,7 +242,7 @@ A `KafkaIdempotentRepository` has the following properties:
| maxCacheSize | How many of the most recently used keys should be stored in
memory (default 1000).
| pollDurationMs | The poll duration of the Kafka consumer. The local caches
are updated immediately. This value will affect how far behind other peers that
update their caches from the topic are relative to the idempotent consumer
instance that sent the cache action message. The default value of this is 100
ms. +
If setting this value explicitly, be aware that there is a tradeoff between
the remote cache liveness and the volume of network traffic between this
repository's consumer and the Kafka brokers. The cache warmup process also
depends on there being one poll that fetches nothing - this indicates that the
stream has been consumed up to the current point. If the poll duration is
excessively long for the rate at which messages are sent on the topic, there
exists a possibility that the cache ca [...]
-| groupId | The groupId to assign to the idempotent consumer. If not specified
it will be randomize.
+| groupId | The groupId to assign to the idempotent consumer. If not specified
it will be randomized.
|===
The repository can be instantiated by defining the `topic` and
`bootstrapServers`, or the `producerConfig` and `consumerConfig` property sets
can be explicitly defined to enable features such as SSL/SASL.
@@ -346,7 +345,7 @@ from(from).routeId("foo")
By default, the Kafka consumer will use auto commit, where the offset will be
committed automatically in the background using a given interval.
In case you want to force manual commits, you can use `KafkaManualCommit` API
from the Camel Exchange, stored on the message header.
-This requires to turn on manual commits by either setting the option
`allowManualCommit` to `true` on the `KafkaComponent`
+This requires turning on manual commits by either setting the option
`allowManualCommit` to `true` on the `KafkaComponent`
or on the endpoint, for example:
[source,java]
@@ -404,9 +403,9 @@ Producing flow backed by same behaviour - camel headers of
particular exchange w
Since kafka headers allows only `byte[]` values, in order camel exchange
header to be propagated its value should be serialized to `bytes[]`,
otherwise header will be skipped.
-Following header value types are supported: `String`, `Integer`, `Long`,
`Double`, `Boolean`, `byte[]`.
+The following header value types are supported: `String`, `Integer`, `Long`,
`Double`, `Boolean`, `byte[]`.
Note: all headers propagated *from* kafka *to* camel exchange will contain
`byte[]` value by default.
-In order to override default functionality uri parameters can be set:
`headerDeserializer` for `from` route and `headerSerializer` for `to` route.
Example:
+In order to override default functionality, these uri parameters can be set:
`headerDeserializer` for `from` route and `headerSerializer` for `to` route.
For example:
[source,java]
----
@@ -426,7 +425,7 @@ from("kafka:my_topic?headerFilterStrategy=#myStrategy")
.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
----
-`myStrategy` object should be subclass of `HeaderFilterStrategy` and must be
placed in the Camel registry, either manually or by registration as a bean in
Spring/Blueprint, as it is `CamelContext` aware.
+`myStrategy` object should be a subclass of `HeaderFilterStrategy` and must be
placed in the Camel registry, either manually or by registration as a bean in
Spring/Blueprint, as it is `CamelContext` aware.
== Kafka Transaction
@@ -438,7 +437,7 @@ from("direct:transaction")
----
At the end of exchange routing, the kafka producer would commit the
transaction or abort it if there is an Exception throwing or the exchange is
`RollbackOnly`. Since Kafka does not support transactions in multi threads, it
will throw `ProducerFencedException` if there is another producer with the same
`transaction.id` to make the transactional request.
-It would work with JTA `camel-jta` by using `transacted()` and if it involves
some resources (SQL or JMS) which supports XA, then they would work in tandem,
where they both will either commit or rollback at the end of the exchange
routing. In some cases, if the JTA transaction manager fails to commit (during
the 2PC processing), but kafka transaction has been committed before and there
is no chance to rollback the changes since the kafka transaction does not
support JTA/XA spec. There is [...]
+It would work with JTA `camel-jta` by using `transacted()` and if it involves
some resources (SQL or JMS) which supports XA, then they would work in tandem,
where they both will either commit or rollback at the end of the exchange
routing. In some cases, if the JTA transaction manager fails to commit (during
the 2PC processing), but kafka transaction has been committed before and there
is no chance to roll back the changes since the kafka transaction does not
support JTA/XA spec. There i [...]
== Setting Kerberos config file
@@ -449,4 +448,4 @@ static {
KafkaComponent.setKerberosConfigLocation("path/to/config/file");
}
----
-include::spring-boot:partial$starter.adoc[]
\ No newline at end of file
+include::spring-boot:partial$starter.adoc[]
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 72a2ef51c17..2f2618939f7 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -171,12 +171,12 @@ public class KafkaComponent extends DefaultComponent
implements SSLContextParame
*
* Error during creating the consumer may be fatal due to invalid
configuration and as such recovery is not
* possible. However, one part of the validation is DNS resolution of the
bootstrap broker hostnames. This may be a
- * temporary networking problem, and could potentially be recoverable.
While other errors are fatal such as some
- * invalid kafka configurations. Unfortunately kafka-client does not
separate this kind of errors.
+ * temporary networking problem, and could potentially be recoverable.
While other errors are fatal, such as some
+ * invalid kafka configurations. Unfortunately, kafka-client does not
separate this kind of errors.
*
* Camel will by default retry forever, and therefore never give up. If
you want to give up after many attempts then
- * set this option and Camel will then when giving up terminate the
consumer. You can manually restart the consumer
- * by stopping and starting the route, to try again.
+ * set this option and Camel will then when giving up terminate the
consumer. To try again, you can manually restart the consumer
+ * by stopping, and starting the route.
*/
public void setCreateConsumerBackoffMaxAttempts(int
createConsumerBackoffMaxAttempts) {
this.createConsumerBackoffMaxAttempts =
createConsumerBackoffMaxAttempts;
@@ -204,7 +204,7 @@ public class KafkaComponent extends DefaultComponent
implements SSLContextParame
* Error during subscribing the consumer to the kafka topic could be
temporary errors due to network issues, and
* could potentially be recoverable.
*
- * Camel will by default retry forever, and therefore never give up. If
you want to give up after many attempts then
+ * Camel will by default retry forever, and therefore never give up. If
you want to give up after many attempts, then
* set this option and Camel will then when giving up terminate the
consumer. You can manually restart the consumer
* by stopping and starting the route, to try again.
*/
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 20686f13463..a695f2028fb 100755
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -824,7 +824,7 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
* <tt>false</tt> then the consumer continues to the next message and
processes it. If the option is <tt>true</tt>
* then the consumer breaks out, and will seek back to offset of the
message that caused a failure, and then
* re-attempt to process this message. However this can lead to endless
processing of the same message if its bound
- * to fail every time, eg a poison message. Therefore its recommended to
deal with that for example by using Camel's
+ * to fail every time, eg a poison message. Therefore it is recommended to
deal with that for example by using Camel's
* error handler.
*/
public void setBreakOnFirstError(boolean breakOnFirstError) {
@@ -889,7 +889,7 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
/**
* Before each retry, the producer refreshes the metadata of relevant
topics to see if a new leader has been
- * elected. Since leader election takes a bit of time, this property
specifies the amount of time that the producer
+ * elected. Since the leader election takes a bit of time, this property
specifies the amount of time that the producer
* waits before refreshing the metadata.
*/
public void setRetryBackoffMs(Integer retryBackoffMs) {
@@ -1220,8 +1220,8 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
}
/**
- * The location of the key store file. This is optional for client and can
be used for two-way authentication for
- * client.
+ * The location of the key store file. This is optional for the client and
can be used for two-way authentication for
+ * the client.
*/
public void setSslKeystoreLocation(String sslKeystoreLocation) {
this.sslKeystoreLocation = sslKeystoreLocation;
@@ -1232,7 +1232,7 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
}
/**
- * The store password for the key store file. This is optional for client
and only needed if sslKeystoreLocation' is
+ * The store password for the key store file. This is optional for the
client and only needed if sslKeystoreLocation' is
* configured. Key store password is not supported for PEM format.
*/
public void setSslKeystorePassword(String sslKeystorePassword) {
@@ -1355,8 +1355,8 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
/**
* The producer will attempt to batch records together into fewer requests
whenever multiple records are being sent
* to the same partition. This helps performance on both the client and
the server. This configuration controls the
- * default batch size in bytes. No attempt will be made to batch records
larger than this size.Requests sent to
- * brokers will contain multiple batches, one for each partition with data
available to be sent.A small batch size
+ * default batch size in bytes. No attempt will be made to batch records
larger than this size. Requests sent to
+ * brokers will contain multiple batches, one for each partition with data
available to be sent. A small batch size
* will make batching less common and may reduce throughput (a batch size
of zero will disable batching entirely). A
* very large batch size may use memory a bit more wastefully as we will
always allocate a buffer of the specified
* batch size in anticipation of additional records.
@@ -1399,7 +1399,7 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
* request. Normally this occurs only under load when records arrive
faster than they can be sent out. However in
* some circumstances the client may want to reduce the number of requests
even under moderate load. This setting
* accomplishes this by adding a small amount of artificial delay that is,
rather than immediately sending out a
- * record the producer will wait for up to the given delay to allow other
records to be sent so that the sends can
+ * record the producer will wait for up to the given delay to allow other
records to be sent so that they can
* be batched together. This can be thought of as analogous to Nagle's
algorithm in TCP. This setting gives the
* upper bound on the delay for batching: once we get batch.size worth of
records for a partition it will be sent
* immediately regardless of this setting, however if we have fewer than
this many bytes accumulated for this
@@ -1421,7 +1421,7 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
* timeout bounds the total time waiting for both metadata fetch and
buffer allocation (blocking in the
* user-supplied serializers or partitioner is not counted against this
timeout). For partitionsFor() this timeout
* bounds the time spent waiting for metadata if it is unavailable. The
transaction-related methods always block,
- * but may timeout if the transaction coordinator could not be discovered
or did not respond within the timeout.
+ * but may time out if the transaction coordinator could not be discovered
or did not respond within the timeout.
*/
public void setMaxBlockMs(Integer maxBlockMs) {
this.maxBlockMs = maxBlockMs;
@@ -1641,7 +1641,7 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
}
/**
- * Deserializer class for key that implements the Deserializer interface.
+ * Deserializer class for the key that implements the Deserializer
interface.
*/
public void setKeyDeserializer(String keyDeserializer) {
this.keyDeserializer = keyDeserializer;
@@ -1663,8 +1663,8 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
}
/**
- * Set if KafkaConsumer will read from beginning or end on startup:
SeekPolicy.BEGINNING: read from beginning.
- * SeekPolicy.END: read from end.
+ * Set if KafkaConsumer will read from the beginning or the end on
startup: SeekPolicy.BEGINNING: read from the beginning.
+ * SeekPolicy.END: read from the end.
*/
public void setSeekTo(SeekPolicy seekTo) {
this.seekTo = seekTo;
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
index f8b00c92923..098d8d10a36 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/BridgeErrorStrategy.java
@@ -47,7 +47,7 @@ public class BridgeErrorStrategy implements
PollExceptionStrategy {
// use bridge error handler to route with exception
recordFetcher.getBridge().handleException(exception);
- // skip this poison message and seek to next message
+ // skip this poison message and seek to the next message
SeekUtil.seekToNextOffset(consumer, partitionLastOffset);
if (exception instanceof AuthenticationException || exception
instanceof AuthorizationException) {
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
index d5424003f00..46c61b92d9e 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/DiscardErrorStrategy.java
@@ -39,7 +39,7 @@ public class DiscardErrorStrategy implements
PollExceptionStrategy {
public void handle(long partitionLastOffset, Exception exception) {
LOG.warn("Requesting the consumer to discard the message and continue
to the next based on polling exception strategy");
- // skip this poison message and seek to next message
+ // skip this poison message and seek to the next message
SeekUtil.seekToNextOffset(consumer, partitionLastOffset);
}
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index 06e6b5ac1e2..a9b966bb961 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -159,7 +159,7 @@ public class KafkaConsumerAsyncManualCommitIT extends
BaseEmbeddedKafkaTestSuppo
MockEndpoint to =
contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT);
// Fourth step: We start again our route, since we have been
committing the offsets from the first step,
- // we will expect to consume from the latest committed offset i.e.
from offset 5
+ // we will expect to consume from the latest committed offset (i.e.,
from offset 5)
context.getRouteController().startRoute("foo");
to.expectedMessageCount(3);
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
index 388a8b22d17..6be326622ce 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBatchSizeIT.java
@@ -64,7 +64,7 @@ public class KafkaConsumerBatchSizeIT extends
BaseEmbeddedKafkaTestSupport {
public void kafkaMessagesIsConsumedByCamel() throws Exception {
MockEndpoint to =
contextExtension.getMockEndpoint(KafkaTestUtil.MOCK_RESULT);
- // First 2 must not be committed since batch size is 3
+ // The first 2 must not be committed since batch size is 3
to.expectedBodiesReceivedInAnyOrder("m1", "m2");
for (int k = 1; k <= 2; k++) {
String msg = "m" + k;