This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f3dc04d [GOBBLIN-825] Initialize message schema at object
construction rather than creating a new instance for every message
f3dc04d is described below
commit f3dc04da9d05546306fc1d7310a350a3aec76777
Author: vbohra <[email protected]>
AuthorDate: Mon Jul 15 14:38:21 2019 -0700
[GOBBLIN-825] Initialize message schema at object construction rather than
creating a new instance for every message
Closes #2686 from vikrambohra/master
---
.../reporter/KeyValueEventObjectReporter.java | 19 ++++++++++++-------
.../reporter/KeyValueMetricObjectReporter.java | 18 +++++++++++-------
.../reporter/KeyValueEventObjectReporterTest.java | 1 +
.../reporter/KeyValueMetricObjectReporterTest.java | 1 +
.../java/org/apache/gobblin/util/AvroUtils.java | 19 ++++++++++++++++++-
.../java/org/apache/gobblin/util/AvroUtilsTest.java | 21 +++++++++++++++++++++
6 files changed, 64 insertions(+), 15 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
index fb4e006..b97e48c 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporter.java
@@ -17,12 +17,14 @@
package org.apache.gobblin.metrics.reporter;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.StringJoiner;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.tuple.Pair;
@@ -56,15 +58,11 @@ public class KeyValueEventObjectReporter extends
EventReporter {
protected List<String> keys;
protected final String randomKey;
protected KeyValuePusher pusher;
- protected Optional<Map<String, String>> namespaceOverride;
- protected final String topic;
+ protected final Schema schema;
public KeyValueEventObjectReporter(Builder builder) {
super(builder);
- this.topic = builder.topic;
- this.namespaceOverride = builder.namespaceOverride;
-
Config config = builder.config.get();
Config pusherConfig = ConfigUtils.getConfigOrEmpty(config,
PUSHER_CONFIG).withFallback(config);
String pusherClassName =
@@ -83,17 +81,24 @@ public class KeyValueEventObjectReporter extends
EventReporter {
"Key not assigned from config. Please set it with property {} Using
randomly generated number {} as key ",
ConfigurationKeys.METRICS_REPORTING_EVENTS_PUSHERKEYS, randomKey);
}
+
+ schema =
AvroUtils.overrideNameAndNamespace(GobblinTrackingEvent.getClassSchema(),
builder.topic, builder.namespaceOverride);
}
@Override
public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
- log.info("Emitting report using KeyValueEventObjectReporter");
List<Pair<String, GenericRecord>> events = Lists.newArrayList();
GobblinTrackingEvent event;
while (null != (event = queue.poll())) {
- GenericRecord record = AvroUtils.overrideNameAndNamespace(event,
this.topic, this.namespaceOverride);
+
+ GenericRecord record=event;
+ try {
+ record = AvroUtils.convertRecordSchema(event, schema);
+ } catch (IOException e){
+ log.error("Unable to generate generic data record", e);
+ }
events.add(Pair.of(buildKey(record), record));
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
index 96372c0..6b6309c 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporter.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Random;
import java.util.StringJoiner;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.tuple.Pair;
@@ -36,7 +37,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.MetricReport;
import org.apache.gobblin.metrics.kafka.PusherUtils;
-import org.apache.gobblin.metrics.reporter.MetricReportReporter;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ConfigUtils;
@@ -57,15 +57,11 @@ public class KeyValueMetricObjectReporter extends
MetricReportReporter {
private List<String> keys;
protected final String randomKey;
protected KeyValuePusher pusher;
- private Optional<Map<String, String>> namespaceOverride;
- protected final String topic;
+ protected final Schema schema;
public KeyValueMetricObjectReporter(Builder builder, Config config) {
super(builder, config);
- this.topic = builder.topic;
- this.namespaceOverride = builder.namespaceOverride;
-
Config pusherConfig = ConfigUtils.getConfigOrEmpty(config,
PUSHER_CONFIG).withFallback(config);
String pusherClassName =
ConfigUtils.getString(config, PUSHER_CLASS,
PusherUtils.DEFAULT_KEY_VALUE_PUSHER_CLASS_NAME);
@@ -83,11 +79,19 @@ public class KeyValueMetricObjectReporter extends
MetricReportReporter {
"Key not assigned from config. Please set it with property {} Using
randomly generated number {} as key ",
ConfigurationKeys.METRICS_REPORTING_PUSHERKEYS, randomKey);
}
+
+ schema = AvroUtils.overrideNameAndNamespace(MetricReport.getClassSchema(),
builder.topic, builder.namespaceOverride);
}
@Override
protected void emitReport(MetricReport report) {
- GenericRecord record = AvroUtils.overrideNameAndNamespace(report,
this.topic, this.namespaceOverride);
+ GenericRecord record = report;
+ try {
+ record = AvroUtils.convertRecordSchema(report, schema);
+ } catch (IOException e){
+ log.error("Unable to generate generic data record", e);
+ return;
+ }
this.pusher.pushKeyValueMessages(Lists.newArrayList(Pair.of(buildKey(record),
record)));
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
index a02ce3f..d2bf63f 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueEventObjectReporterTest.java
@@ -106,6 +106,7 @@ public class KeyValueEventObjectReporterTest extends
KeyValueEventObjectReporter
Assert.assertEquals(retrievedEvent.getValue().get("name"), eventName);
int partition = Integer.parseInt(retrievedEvent.getKey());
Assert.assertTrue((0 <= partition && partition <= 99));
+ Assert.assertTrue(retrievedEvent.getValue().getSchema() ==
reporter.schema);
}
private static Pair<String, GenericRecord> nextKVEvent(Iterator<Pair<String,
GenericRecord>> it) {
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
index 32b9620..f674b0e 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KeyValueMetricObjectReporterTest.java
@@ -95,6 +95,7 @@ public class KeyValueMetricObjectReporterTest extends
KeyValueMetricObjectReport
Assert.assertEquals(retrievedEvent.getValue().getSchema().getName(), name);
int partition = Integer.parseInt(retrievedEvent.getKey());
Assert.assertTrue((0 <= partition && partition <= 99));
+ Assert.assertTrue(retrievedEvent.getValue().getSchema() ==
reporter.schema);
reporter.close();
}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 43f03f7..2ea7e10 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -899,7 +899,7 @@ public class AvroUtils {
* @param input input record who's name and namespace need to be overridden
* @param nameOverride new name for the record schema
* @param namespaceOverride Optional map containing namespace overrides
- * @return an output record with overriden name and possibly namespace
+ * @return an output record with overridden name and possibly namespace
*/
public static GenericRecord overrideNameAndNamespace(GenericRecord input,
String nameOverride, Optional<Map<String, String>> namespaceOverride) {
@@ -918,4 +918,21 @@ public class AvroUtils {
return output;
}
+ /**
+ * Given a input schema, Override the name and namespace of the schema and
return a new schema
+ * @param input
+ * @param nameOverride
+ * @param namespaceOverride
+ * @return a schema with overridden name and possibly namespace
+ */
+ public static Schema overrideNameAndNamespace(Schema input, String
nameOverride, Optional<Map<String, String>> namespaceOverride) {
+
+ Schema newSchema = switchName(input, nameOverride);
+ if(namespaceOverride.isPresent()) {
+ newSchema = switchNamespace(newSchema, namespaceOverride.get());
+ }
+
+ return newSchema;
+ }
+
}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 8031c7e..248f21f 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -514,4 +514,25 @@ public class AvroUtilsTest {
}
+ @Test
+ public void overrideSchemaNameAndNamespaceTest() {
+ String inputName = "input_name";
+ String inputNamespace = "input_namespace";
+ String outputName = "output_name";
+ String outputNamespace = "output_namespace";
+
+ Schema inputSchema =
SchemaBuilder.record(inputName).namespace(inputNamespace).fields()
+ .name("integer1")
+ .type().intBuilder().endInt().noDefault()
+ .endRecord();
+
+ Map<String,String> namespaceOverrideMap = new HashMap<>();
+ namespaceOverrideMap.put(inputNamespace, outputNamespace);
+
+ Schema newSchema = AvroUtils.overrideNameAndNamespace(inputSchema,
outputName, Optional.of(namespaceOverrideMap));
+
+ Assert.assertEquals(newSchema.getName(), outputName);
+ Assert.assertEquals(newSchema.getNamespace(), outputNamespace);
+ }
+
}