This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b729be0 [GOBBLIN-1546] Don't contact schema registry for every record
read by job status monitor (#3397)
b729be0 is described below
commit b729be0a94ffa61d5b4b3690859b365bd567f6b2
Author: Jack Moseley <[email protected]>
AuthorDate: Mon Sep 20 10:18:44 2021 -0700
[GOBBLIN-1546] Don't contact schema registry for every record read by job
status monitor (#3397)
The kafka job status monitor is currently calling
readSchemaVersioningInformation to advance the input stream of the kafka
messages it reads to the record portion, which calls the schema registry every
time, but the result of it is ignored. This PR adds another API that just
advances the input stream without contacting the schema registry, to remove a
point of failure.
---
.../reporter/util/FixedSchemaVersionWriter.java | 5 +++++
.../reporter/util/NoopSchemaVersionWriter.java | 4 ++++
.../metrics/reporter/util/SchemaVersionWriter.java | 6 ++++++
.../reporter/util/SchemaRegistryVersionWriter.java | 23 +++++++++++++++-------
.../monitoring/KafkaAvroJobStatusMonitor.java | 2 +-
5 files changed, 32 insertions(+), 8 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/FixedSchemaVersionWriter.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/FixedSchemaVersionWriter.java
index 4778c21..331e1e8 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/FixedSchemaVersionWriter.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/FixedSchemaVersionWriter.java
@@ -44,4 +44,9 @@ public class FixedSchemaVersionWriter implements
SchemaVersionWriter<Integer> {
throws IOException {
return inputStream.readInt();
}
+
+ @Override
+ public void advanceInputStreamToRecord(DataInputStream inputStream) throws
IOException {
+ inputStream.readInt();
+ }
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/NoopSchemaVersionWriter.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/NoopSchemaVersionWriter.java
index 32021d0..b6a0125 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/NoopSchemaVersionWriter.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/NoopSchemaVersionWriter.java
@@ -42,4 +42,8 @@ public class NoopSchemaVersionWriter implements
SchemaVersionWriter<Void> {
throws IOException {
return null;
}
+
+ @Override
+ public void advanceInputStreamToRecord(DataInputStream inputStream) throws
IOException {
+ }
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaVersionWriter.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaVersionWriter.java
index c96b1f0..903e53e 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaVersionWriter.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaVersionWriter.java
@@ -54,4 +54,10 @@ public interface SchemaVersionWriter<S> {
*/
public S readSchemaVersioningInformation(DataInputStream inputStream) throws
IOException;
+ /**
+ * Advance inputStream to the location where actual record starts, but
ignore the schema information.
+ * @param inputStream {@link java.io.DataInputStream} containing schema
information and serialized record.
+ * @throws IOException
+ */
+ public void advanceInputStreamToRecord(DataInputStream inputStream) throws
IOException;
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
index 9c52f10..4c4eccd 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
@@ -117,6 +117,21 @@ public class SchemaRegistryVersionWriter implements
SchemaVersionWriter<Schema>
@Override
public Schema readSchemaVersioningInformation(DataInputStream inputStream)
throws IOException {
+ String hexKey = getSchemaHexKey(inputStream);
+
+ try {
+ return this.registry.getSchemaByKey(hexKey);
+ } catch (SchemaRegistryException sre) {
+ throw new IOException("Failed to retrieve schema for key " + hexKey,
sre);
+ }
+ }
+
+ @Override
+ public void advanceInputStreamToRecord(DataInputStream inputStream) throws
IOException {
+ getSchemaHexKey(inputStream);
+ }
+
+ private String getSchemaHexKey(DataInputStream inputStream) throws
IOException {
if (inputStream.readByte() != KafkaAvroSchemaRegistry.MAGIC_BYTE) {
throw new IOException("MAGIC_BYTE not found in Avro message.");
}
@@ -128,12 +143,6 @@ public class SchemaRegistryVersionWriter implements
SchemaVersionWriter<Schema>
.format("Could not read enough bytes for schema id. Expected: %d,
found: %d.", schemaIdLengthBytes,
bytesRead));
}
- String hexKey = Hex.encodeHexString(byteKey);
-
- try {
- return this.registry.getSchemaByKey(hexKey);
- } catch (SchemaRegistryException sre) {
- throw new IOException("Failed to retrieve schema for key " + hexKey,
sre);
- }
+ return Hex.encodeHexString(byteKey);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index aded7fb..dd6b343 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -94,7 +94,7 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
public GobblinTrackingEvent
deserializeEvent(DecodeableKafkaRecord<byte[],byte[]> message) {
try {
InputStream is = new ByteArrayInputStream(message.getValue());
- schemaVersionWriter.readSchemaVersioningInformation(new
DataInputStream(is));
+ schemaVersionWriter.advanceInputStreamToRecord(new DataInputStream(is));
Decoder decoder = DecoderFactory.get().binaryDecoder(is,
this.decoder.get());
return this.reader.get().read(null, decoder);