This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a273278 [GOBBLIN-733] Instrument Avro Converters to allow converter
metrics emission in both batch and streaming modes.[]
a273278 is described below
commit a273278825703bca02701d9c49ef5d3e459f3ba5
Author: sv2000 <[email protected]>
AuthorDate: Fri Apr 12 15:03:10 2019 -0700
[GOBBLIN-733] Instrument Avro Converters to allow converter metrics
emission in both batch and streaming modes.[]
Closes #2600 from sv2000/instrumentedConverter
---
.../java/org/apache/gobblin/converter/AvroToAvroConverterBase.java | 7 ++-----
.../apache/gobblin/converter/filter/AvroProjectionConverter.java | 2 +-
.../filter/GobblinTrackingEventFlattenFilterConverter.java | 3 +--
.../apache/gobblin/converter/filter/AvroFieldsPickConverter.java | 2 +-
.../org/apache/gobblin/converter/filter/AvroFilterConverter.java | 2 +-
5 files changed, 6 insertions(+), 10 deletions(-)
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AvroToAvroConverterBase.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AvroToAvroConverterBase.java
index 215a7c4..12aa4d0 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AvroToAvroConverterBase.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/AvroToAvroConverterBase.java
@@ -21,18 +21,15 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.converter.InstrumentedConverter;
/**
* A base abstract {@link Converter} class for data transformation from Avro
to Avro.
*/
-public abstract class AvroToAvroConverterBase extends Converter<Schema,
Schema, GenericRecord, GenericRecord> {
+public abstract class AvroToAvroConverterBase extends
InstrumentedConverter<Schema, Schema, GenericRecord, GenericRecord> {
@Override
public abstract Schema convertSchema(Schema inputSchema, WorkUnitState
workUnit)
throws SchemaConversionException;
-
- @Override
- public abstract Iterable<GenericRecord> convertRecord(Schema outputSchema,
GenericRecord inputRecord, WorkUnitState workUnit)
- throws DataConversionException;
}
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
index 6961ab5..1533f94 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
@@ -79,7 +79,7 @@ public class AvroProjectionConverter extends
AvroToAvroConverterBase {
* Convert the schema of inputRecord to outputSchema.
*/
@Override
- public Iterable<GenericRecord> convertRecord(Schema outputSchema,
GenericRecord inputRecord, WorkUnitState workUnit)
+ public Iterable<GenericRecord> convertRecordImpl(Schema outputSchema,
GenericRecord inputRecord, WorkUnitState workUnit)
throws DataConversionException {
try {
return new
SingleRecordIterable<>(AvroUtils.convertRecordSchema(inputRecord,
outputSchema));
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
index a1d6b64..d393a8c 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
@@ -24,7 +24,6 @@ import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
-import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -121,7 +120,7 @@ public class GobblinTrackingEventFlattenFilterConverter
extends AvroToAvroConver
}
@Override
- public Iterable<GenericRecord> convertRecord(Schema outputSchema,
GenericRecord inputRecord, WorkUnitState workUnit)
+ public Iterable<GenericRecord> convertRecordImpl(Schema outputSchema,
GenericRecord inputRecord, WorkUnitState workUnit)
throws DataConversionException {
GenericRecord genericRecord = new GenericData.Record(outputSchema);
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
index 74ed3f3..6f341fe 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
@@ -246,7 +246,7 @@ public class AvroFieldsPickConverter extends
AvroToAvroConverterBase {
}
@Override
- public Iterable<GenericRecord> convertRecord(Schema outputSchema,
GenericRecord inputRecord, WorkUnitState workUnit)
+ public Iterable<GenericRecord> convertRecordImpl(Schema outputSchema,
GenericRecord inputRecord, WorkUnitState workUnit)
throws DataConversionException {
try {
return new
SingleRecordIterable<>(AvroUtils.convertRecordSchema(inputRecord,
outputSchema));
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFilterConverter.java
b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFilterConverter.java
index d859888..810419d 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFilterConverter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFilterConverter.java
@@ -81,7 +81,7 @@ public class AvroFilterConverter extends
AvroToAvroConverterBase {
* @see
org.apache.gobblin.converter.AvroToAvroConverterBase#convertRecord(org.apache.avro.Schema,
org.apache.avro.generic.GenericRecord,
org.apache.gobblin.configuration.WorkUnitState)
*/
@Override
- public Iterable<GenericRecord> convertRecord(Schema outputSchema,
GenericRecord inputRecord, WorkUnitState workUnit)
+ public Iterable<GenericRecord> convertRecordImpl(Schema outputSchema,
GenericRecord inputRecord, WorkUnitState workUnit)
throws DataConversionException {
Optional<Object> fieldValue = AvroUtils.getFieldValue(inputRecord,
this.fieldName);
if (fieldValue.isPresent() &&
fieldValue.get().toString().equals(this.fieldValue)) {