This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ae62669 [GOBBLIN-970] Pass metric context from the KafkaSource to the
KafkaWor…
ae62669 is described below
commit ae62669d31f89a9b62e520f1ca3969536c36db19
Author: sv2000 <[email protected]>
AuthorDate: Thu Nov 21 11:08:58 2019 -0800
[GOBBLIN-970] Pass metric context from the KafkaSource to the KafkaWor…
Closes #2815 from sv2000/metricContextPacker
---
.../source/extractor/extract/kafka/KafkaSource.java | 2 +-
.../kafka/workunit/packer/KafkaWorkUnitPacker.java | 20 +++++++++++++++-----
.../workunit/packer/KafkaWorkUnitPackerTest.java | 7 ++++---
.../runtime/StateStoreBasedWatermarkStorage.java | 2 +-
4 files changed, 21 insertions(+), 10 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 5fa53ea..ac10acd 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -263,7 +263,7 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
//determine the number of mappers
int maxMapperNum =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY,
ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
- KafkaWorkUnitPacker kafkaWorkUnitPacker =
KafkaWorkUnitPacker.getInstance(this, state);
+ KafkaWorkUnitPacker kafkaWorkUnitPacker =
KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
int numOfMultiWorkunits = maxMapperNum;
if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
double totalEstDataSize =
kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 31e30d7..1d34aec 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -23,7 +23,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +36,7 @@ import com.google.common.primitives.Doubles;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.source.extractor.WatermarkInterval;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
@@ -46,6 +46,7 @@ import
org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
import org.apache.gobblin.source.extractor.extract.kafka.MultiLongWatermark;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
@@ -368,18 +369,23 @@ public abstract class KafkaWorkUnitPacker {
}
public static KafkaWorkUnitPacker getInstance(AbstractSource<?, ?> source,
SourceState state) {
+ return getInstance(source, state, Optional.absent());
+ }
+
+ public static KafkaWorkUnitPacker getInstance(AbstractSource<?, ?> source,
SourceState state,
+ Optional<MetricContext> metricContext) {
if (state.contains(KAFKA_WORKUNIT_PACKER_TYPE)) {
String packerTypeStr = state.getProp(KAFKA_WORKUNIT_PACKER_TYPE);
Optional<PackerType> packerType = Enums.getIfPresent(PackerType.class,
packerTypeStr);
if (packerType.isPresent()) {
- return getInstance(packerType.get(), source, state);
+ return getInstance(packerType.get(), source, state, metricContext);
}
throw new IllegalArgumentException("WorkUnit packer type " +
packerTypeStr + " not found");
}
- return getInstance(DEFAULT_PACKER_TYPE, source, state);
+ return getInstance(DEFAULT_PACKER_TYPE, source, state, metricContext);
}
- public static KafkaWorkUnitPacker getInstance(PackerType packerType,
AbstractSource<?, ?> source, SourceState state) {
+ public static KafkaWorkUnitPacker getInstance(PackerType packerType,
AbstractSource<?, ?> source, SourceState state, Optional<MetricContext>
metricContext) {
switch (packerType) {
case SINGLE_LEVEL:
return new KafkaSingleLevelWorkUnitPacker(source, state);
@@ -388,7 +394,11 @@ public abstract class KafkaWorkUnitPacker {
case CUSTOM:
Preconditions.checkArgument(state.contains(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE));
String className =
state.getProp(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE);
- return
GobblinConstructorUtils.invokeConstructor(KafkaWorkUnitPacker.class, className,
source, state);
+ try {
+ return (KafkaWorkUnitPacker)
GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className),
source, state, metricContext);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
default:
throw new IllegalArgumentException("WorkUnit packer type " +
packerType + " not found");
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
index e392566..89a81d0 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
@@ -20,14 +20,15 @@ package
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
import java.util.List;
import java.util.Map;
-import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.source.extractor.extract.AbstractSource;
-import org.apache.gobblin.source.workunit.WorkUnit;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.extractor.extract.AbstractSource;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
import static
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE;
import static
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_TYPE;
import static
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE;
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
index 651a3ce..2a32f42 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
@@ -137,7 +137,7 @@ public class StateStoreBasedWatermarkStorage implements
WatermarkStorage {
return committed;
}
- Iterable<CheckpointableWatermarkState> getAllCommittedWatermarks() throws
IOException {
+ public Iterable<CheckpointableWatermarkState> getAllCommittedWatermarks()
throws IOException {
return _stateStore.getAll(_storeName);
}