This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a1061e4 [GOBBLIN-887] Generialize UniversalKafkaSource to accept
Extractor that not extending KafkaExtractor
a1061e4 is described below
commit a1061e4ae3b296f5978cdabf79c550687329a35c
Author: autumnust <[email protected]>
AuthorDate: Thu Sep 26 08:47:02 2019 -0700
[GOBBLIN-887] Generialize UniversalKafkaSource to accept Extractor that not
extending KafkaExtractor
Closes #2741 from autumnust/KafkaExtractor
---
.../extractor/extract/kafka/UniversalKafkaSource.java | 15 +++++++--------
1 file changed, 7 insertions(+), 8 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
index 87714d7..b227636 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
@@ -19,19 +19,17 @@ package org.apache.gobblin.source.extractor.extract.kafka;
import java.io.IOException;
-import com.google.common.base.Preconditions;
-
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.extract.EventBasedExtractor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
+import com.google.common.base.Preconditions;
/**
- * A {@link KafkaSource} to use with arbitrary {@link KafkaExtractor}. Specify
the extractor to use with key
+ * A {@link KafkaSource} to use with arbitrary {@link EventBasedExtractor}.
Specify the extractor to use with key
* {@link #EXTRACTOR_TYPE}.
*/
public class UniversalKafkaSource<S, D> extends KafkaSource<S, D> {
@@ -39,12 +37,13 @@ public class UniversalKafkaSource<S, D> extends
KafkaSource<S, D> {
public static final String EXTRACTOR_TYPE =
"gobblin.source.kafka.extractorType";
@Override
- public Extractor<S, D> getExtractor(WorkUnitState state) throws IOException {
+ public Extractor<S, D> getExtractor(WorkUnitState state)
+ throws IOException {
Preconditions.checkArgument(state.contains(EXTRACTOR_TYPE), "Missing key "
+ EXTRACTOR_TYPE);
try {
- ClassAliasResolver<KafkaExtractor> aliasResolver = new
ClassAliasResolver<>(KafkaExtractor.class);
- Class<? extends KafkaExtractor> klazz =
aliasResolver.resolveClass(state.getProp(EXTRACTOR_TYPE));
+ ClassAliasResolver<EventBasedExtractor> aliasResolver = new
ClassAliasResolver<>(EventBasedExtractor.class);
+ Class<? extends EventBasedExtractor> klazz =
aliasResolver.resolveClass(state.getProp(EXTRACTOR_TYPE));
return GobblinConstructorUtils.invokeLongestConstructor(klazz, state);
} catch (ReflectiveOperationException e) {