This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b729be0  [GOBBLIN-1546] Don't contact schema registry for every record 
read by job status monitor (#3397)
b729be0 is described below

commit b729be0a94ffa61d5b4b3690859b365bd567f6b2
Author: Jack Moseley <[email protected]>
AuthorDate: Mon Sep 20 10:18:44 2021 -0700

    [GOBBLIN-1546] Don't contact schema registry for every record read by job 
status monitor (#3397)
    
    The kafka job status monitor is currently calling 
readSchemaVersioningInformation to advance the input stream of the kafka 
messages it reads to the record portion, which calls the schema registry every 
time, but the result of it is ignored. This PR adds another API that just 
advances the input stream without contacting the schema registry, to remove a 
point of failure.
---
 .../reporter/util/FixedSchemaVersionWriter.java    |  5 +++++
 .../reporter/util/NoopSchemaVersionWriter.java     |  4 ++++
 .../metrics/reporter/util/SchemaVersionWriter.java |  6 ++++++
 .../reporter/util/SchemaRegistryVersionWriter.java | 23 +++++++++++++++-------
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  2 +-
 5 files changed, 32 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/FixedSchemaVersionWriter.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/FixedSchemaVersionWriter.java
index 4778c21..331e1e8 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/FixedSchemaVersionWriter.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/FixedSchemaVersionWriter.java
@@ -44,4 +44,9 @@ public class FixedSchemaVersionWriter implements 
SchemaVersionWriter<Integer> {
       throws IOException {
     return inputStream.readInt();
   }
+
+  @Override
+  public void advanceInputStreamToRecord(DataInputStream inputStream) throws 
IOException {
+    inputStream.readInt();
+  }
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/NoopSchemaVersionWriter.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/NoopSchemaVersionWriter.java
index 32021d0..b6a0125 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/NoopSchemaVersionWriter.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/NoopSchemaVersionWriter.java
@@ -42,4 +42,8 @@ public class NoopSchemaVersionWriter implements 
SchemaVersionWriter<Void> {
       throws IOException {
     return null;
   }
+
+  @Override
+  public void advanceInputStreamToRecord(DataInputStream inputStream) throws 
IOException {
+  }
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaVersionWriter.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaVersionWriter.java
index c96b1f0..903e53e 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaVersionWriter.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaVersionWriter.java
@@ -54,4 +54,10 @@ public interface SchemaVersionWriter<S> {
    */
   public S readSchemaVersioningInformation(DataInputStream inputStream) throws 
IOException;
 
+  /**
+   * Advance inputStream to the location where actual record starts, but 
ignore the schema information.
+   * @param inputStream {@link java.io.DataInputStream} containing schema 
information and serialized record.
+   * @throws IOException
+   */
+  public void advanceInputStreamToRecord(DataInputStream inputStream) throws 
IOException;
 }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
index 9c52f10..4c4eccd 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/SchemaRegistryVersionWriter.java
@@ -117,6 +117,21 @@ public class SchemaRegistryVersionWriter implements 
SchemaVersionWriter<Schema>
   @Override
   public Schema readSchemaVersioningInformation(DataInputStream inputStream)
       throws IOException {
+    String hexKey = getSchemaHexKey(inputStream);
+
+    try {
+      return this.registry.getSchemaByKey(hexKey);
+    } catch (SchemaRegistryException sre) {
+      throw new IOException("Failed to retrieve schema for key " + hexKey, 
sre);
+    }
+  }
+
+  @Override
+  public void advanceInputStreamToRecord(DataInputStream inputStream) throws 
IOException {
+    getSchemaHexKey(inputStream);
+  }
+
+  private String getSchemaHexKey(DataInputStream inputStream) throws 
IOException {
     if (inputStream.readByte() != KafkaAvroSchemaRegistry.MAGIC_BYTE) {
       throw new IOException("MAGIC_BYTE not found in Avro message.");
     }
@@ -128,12 +143,6 @@ public class SchemaRegistryVersionWriter implements 
SchemaVersionWriter<Schema>
           .format("Could not read enough bytes for schema id. Expected: %d, 
found: %d.", schemaIdLengthBytes,
               bytesRead));
     }
-    String hexKey = Hex.encodeHexString(byteKey);
-
-    try {
-      return this.registry.getSchemaByKey(hexKey);
-    } catch (SchemaRegistryException sre) {
-      throw new IOException("Failed to retrieve schema for key " + hexKey, 
sre);
-    }
+    return Hex.encodeHexString(byteKey);
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index aded7fb..dd6b343 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -94,7 +94,7 @@ public class KafkaAvroJobStatusMonitor extends 
KafkaJobStatusMonitor {
   public GobblinTrackingEvent 
deserializeEvent(DecodeableKafkaRecord<byte[],byte[]> message) {
     try {
       InputStream is = new ByteArrayInputStream(message.getValue());
-      schemaVersionWriter.readSchemaVersioningInformation(new 
DataInputStream(is));
+      schemaVersionWriter.advanceInputStreamToRecord(new DataInputStream(is));
       Decoder decoder = DecoderFactory.get().binaryDecoder(is, 
this.decoder.get());
 
       return this.reader.get().read(null, decoder);

Reply via email to