This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f3dc04d  [GOBBLIN-825] Initialize message schema at object 
construction rather than creating a new instance for every message
f3dc04d is described below

commit f3dc04da9d05546306fc1d7310a350a3aec76777
Author: vbohra <[email protected]>
AuthorDate: Mon Jul 15 14:38:21 2019 -0700

    [GOBBLIN-825] Initialize message schema at object construction rather than 
creating a new instance for every message
    
    Closes #2686 from vikrambohra/master
---
 .../reporter/KeyValueEventObjectReporter.java       | 19 ++++++++++++-------
 .../reporter/KeyValueMetricObjectReporter.java      | 18 +++++++++++-------
 .../reporter/KeyValueEventObjectReporterTest.java   |  1 +
 .../reporter/KeyValueMetricObjectReporterTest.java  |  1 +
 .../java/org/apache/gobblin/util/AvroUtils.java     | 19 ++++++++++++++++++-
 .../java/org/apache/gobblin/util/AvroUtilsTest.java | 21 +++++++++++++++++++++
 6 files changed, 64 insertions(+), 15 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
index fb4e006..b97e48c 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
@@ -17,12 +17,14 @@
 
 package org.apache.gobblin.metrics.reporter;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Random;
 import java.util.StringJoiner;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -56,15 +58,11 @@ public class KeyValueEventObjectReporter extends 
EventReporter {
   protected List<String> keys;
   protected final String randomKey;
   protected KeyValuePusher pusher;
-  protected Optional<Map<String, String>> namespaceOverride;
-  protected final String topic;
+  protected final Schema schema;
 
   public KeyValueEventObjectReporter(Builder builder) {
     super(builder);
 
-    this.topic = builder.topic;
-    this.namespaceOverride = builder.namespaceOverride;
-
     Config config = builder.config.get();
     Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, 
PUSHER_CONFIG).withFallback(config);
     String pusherClassName =
@@ -83,17 +81,24 @@ public class KeyValueEventObjectReporter extends 
EventReporter {
           "Key not assigned from config. Please set it with property {} Using 
randomly generated number {} as key ",
           ConfigurationKeys.METRICS_REPORTING_EVENTS_PUSHERKEYS, randomKey);
     }
+
+    schema = 
AvroUtils.overrideNameAndNamespace(GobblinTrackingEvent.getClassSchema(), 
builder.topic, builder.namespaceOverride);
   }
 
   @Override
   public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
-    log.info("Emitting report using KeyValueEventObjectReporter");
 
     List<Pair<String, GenericRecord>> events = Lists.newArrayList();
     GobblinTrackingEvent event;
 
     while (null != (event = queue.poll())) {
-      GenericRecord record = AvroUtils.overrideNameAndNamespace(event, 
this.topic, this.namespaceOverride);
+
+      GenericRecord record=event;
+      try {
+        record = AvroUtils.convertRecordSchema(event, schema);
+      } catch (IOException e){
+        log.error("Unable to generate generic data record", e);
+      }
       events.add(Pair.of(buildKey(record), record));
     }
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
index 96372c0..6b6309c 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.StringJoiner;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -36,7 +37,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.MetricReport;
 import org.apache.gobblin.metrics.kafka.PusherUtils;
-import org.apache.gobblin.metrics.reporter.MetricReportReporter;
 import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -57,15 +57,11 @@ public class KeyValueMetricObjectReporter extends 
MetricReportReporter {
   private List<String> keys;
   protected final String randomKey;
   protected KeyValuePusher pusher;
-  private Optional<Map<String, String>> namespaceOverride;
-  protected final String topic;
+  protected final Schema schema;
 
   public KeyValueMetricObjectReporter(Builder builder, Config config) {
     super(builder, config);
 
-    this.topic = builder.topic;
-    this.namespaceOverride = builder.namespaceOverride;
-
     Config pusherConfig = ConfigUtils.getConfigOrEmpty(config, 
PUSHER_CONFIG).withFallback(config);
     String pusherClassName =
         ConfigUtils.getString(config, PUSHER_CLASS, 
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
@@ -83,11 +79,19 @@ public class KeyValueMetricObjectReporter extends 
MetricReportReporter {
           "Key not assigned from config. Please set it with property {} Using 
randomly generated number {} as key ",
           ConfigurationKeys.METRICS_REPORTING_PUSHERKEYS, randomKey);
     }
+
+    schema = AvroUtils.overrideNameAndNamespace(MetricReport.getClassSchema(), 
builder.topic, builder.namespaceOverride);
   }
 
   @Override
   protected void emitReport(MetricReport report) {
-    GenericRecord record = AvroUtils.overrideNameAndNamespace(report, 
this.topic, this.namespaceOverride);
+    GenericRecord record = report;
+    try {
+      record = AvroUtils.convertRecordSchema(report, schema);
+    } catch (IOException e){
+      log.error("Unable to generate generic data record", e);
+      return;
+    }
     
this.pusher.pushKeyValueMessages(Lists.newArrayList(Pair.of(buildKey(record), 
record)));
   }
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
index a02ce3f..d2bf63f 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
@@ -106,6 +106,7 @@ public class KeyValueEventObjectReporterTest extends 
KeyValueEventObjectReporter
     Assert.assertEquals(retrievedEvent.getValue().get("name"), eventName);
     int partition = Integer.parseInt(retrievedEvent.getKey());
     Assert.assertTrue((0 <= partition && partition <= 99));
+    Assert.assertTrue(retrievedEvent.getValue().getSchema() == 
reporter.schema);
   }
 
   private static Pair<String, GenericRecord> nextKVEvent(Iterator<Pair<String, 
GenericRecord>> it) {
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
index 32b9620..f674b0e 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
@@ -95,6 +95,7 @@ public class KeyValueMetricObjectReporterTest extends 
KeyValueMetricObjectReport
     Assert.assertEquals(retrievedEvent.getValue().getSchema().getName(), name);
     int partition = Integer.parseInt(retrievedEvent.getKey());
     Assert.assertTrue((0 <= partition && partition <= 99));
+    Assert.assertTrue(retrievedEvent.getValue().getSchema() == 
reporter.schema);
 
     reporter.close();
   }
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 43f03f7..2ea7e10 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -899,7 +899,7 @@ public class AvroUtils {
    * @param input input record who's name and namespace need to be overridden
    * @param nameOverride new name for the record schema
    * @param namespaceOverride Optional map containing namespace overrides
-   * @return an output record with overriden name and possibly namespace
+   * @return an output record with overridden name and possibly namespace
    */
   public static GenericRecord overrideNameAndNamespace(GenericRecord input, 
String nameOverride, Optional<Map<String, String>> namespaceOverride) {
 
@@ -918,4 +918,21 @@ public class AvroUtils {
     return output;
   }
 
+  /**
+   * Given a input schema, Override the name and namespace of the schema and 
return a new schema
+   * @param input
+   * @param nameOverride
+   * @param namespaceOverride
+   * @return a schema with overridden name and possibly namespace
+   */
+  public static Schema overrideNameAndNamespace(Schema input, String 
nameOverride, Optional<Map<String, String>> namespaceOverride) {
+
+    Schema newSchema = switchName(input, nameOverride);
+    if(namespaceOverride.isPresent()) {
+      newSchema = switchNamespace(newSchema, namespaceOverride.get());
+    }
+
+    return newSchema;
+  }
+
 }
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 8031c7e..248f21f 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -514,4 +514,25 @@ public class AvroUtilsTest {
 
   }
 
+  @Test
+  public void overrideSchemaNameAndNamespaceTest() {
+    String inputName = "input_name";
+    String inputNamespace = "input_namespace";
+    String outputName = "output_name";
+    String outputNamespace = "output_namespace";
+
+    Schema inputSchema = 
SchemaBuilder.record(inputName).namespace(inputNamespace).fields()
+        .name("integer1")
+        .type().intBuilder().endInt().noDefault()
+        .endRecord();
+
+    Map<String,String> namespaceOverrideMap = new HashMap<>();
+    namespaceOverrideMap.put(inputNamespace, outputNamespace);
+
+    Schema newSchema = AvroUtils.overrideNameAndNamespace(inputSchema, 
outputName, Optional.of(namespaceOverrideMap));
+
+    Assert.assertEquals(newSchema.getName(), outputName);
+    Assert.assertEquals(newSchema.getNamespace(), outputNamespace);
+  }
+
 }

Reply via email to