This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 010d14879 Emit to diagnostic kafka topic in round robin fashion to
support increasing this topic partition count (#1631)
010d14879 is described below
commit 010d14879f401cc01f083cf84f32eb8741cc8398
Author: jia-gao <[email protected]>
AuthorDate: Tue Sep 27 18:44:07 2022 -0700
Emit to diagnostic kafka topic in round robin fashion to support increasing
this topic partition count (#1631)
Symptom: For some Samza jobs, the size of the diagnostic stream partition
grows to be large and can cause issues in system streams like Kafka
Cause:
Currently, diagnostics topics for Samza jobs are created with a single
partition.
Because it is created in DiagnosticsUtil and uses the default partition
count, which is 1.
There could be a need to increase the partition count of the diagnostic
stream to resolve the issue above.
However, Samza currently uses hostnames as partition key to distribute
diagnostic msgs. The host names can be very similar, the hash of these in the
partition key will not evenly distribute the msgs across partitions.
Change:
Use the “null” partition key instead of using the hostname as the partition
key, and leverage on Kafka default partitioner to evenly distributed the events
across partitions.
Note that this change is backward compatible with the default partition
count (1) since every msg goes to the same partition regardless of the
partition key.
API changes: None
Backward compatible: No. Note that the change will be backwards
incompatible only when the diagnostic stream has more than one partition. The
default creation for diagnostic stream inside samza is with 1 partition, but
there are cases that the stream pre-exists or partition count increased after
creation.
Test Done:
./gradlew build
---
.../src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index f420a93c9..5715365fe 100644
---
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -240,7 +240,7 @@ public class DiagnosticsManager {
if (!diagnosticsStreamMessage.isEmpty()) {
systemProducer.send(DiagnosticsManager.class.getName(),
- new OutgoingMessageEnvelope(diagnosticSystemStream, hostname,
null,
+ new OutgoingMessageEnvelope(diagnosticSystemStream, null, null,
new
MetricsSnapshotSerdeV2().toBytes(diagnosticsStreamMessage.convertToMetricsSnapshot())));
systemProducer.flush(DiagnosticsManager.class.getName());