Repository: incubator-gobblin Updated Branches: refs/heads/master 05bf034e3 -> 6b616d472
[GOBBLIN-245] Create topic specific extract of a WorkUnit in KafkaSource Closes #2095 from zxcware/kafka Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6b616d47 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6b616d47 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6b616d47 Branch: refs/heads/master Commit: 6b616d472f61047a47364def3f681e4735207b64 Parents: 05bf034 Author: zhchen <[email protected]> Authored: Thu Sep 7 18:09:24 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Sep 7 18:09:24 2017 -0700 ---------------------------------------------------------------------- .../extractor/extract/kafka/KafkaSource.java | 29 ++++++++++++++------ 1 file changed, 21 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b616d47/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index af735d1..606be62 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory; import com.codahale.metrics.Timer; import com.google.common.base.Joiner; import com.google.common.base.Function; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; @@ -66,8 +65,6 @@ import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.dataset.DatasetUtils; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys; -import org.apache.gobblin.source.workunit.MultiWorkUnit; import lombok.Getter; import lombok.Setter; @@ -133,7 +130,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { private volatile boolean doneGettingAllPreviousOffsets = false; private Extract.TableType tableType; - private String extractNameSpace; + private String extractNamespace; private boolean isFullExtract; private boolean shouldEnableDatasetStateStore; private AtomicBoolean isDatasetStateEnabled = new AtomicBoolean(false); @@ -167,14 +164,15 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { String tableTypeStr = state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, KafkaSource.DEFAULT_TABLE_TYPE.toString()); tableType = Extract.TableType.valueOf(tableTypeStr); - extractNameSpace = + extractNamespace = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, KafkaSource.DEFAULT_NAMESPACE_NAME); } else { // To be compatible, reject table type and namespace configuration keys as previous implementation tableType = KafkaSource.DEFAULT_TABLE_TYPE; - extractNameSpace = KafkaSource.DEFAULT_NAMESPACE_NAME; + extractNamespace = KafkaSource.DEFAULT_NAMESPACE_NAME; } isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY); + this.shouldEnableDatasetStateStore = state.getPropAsBoolean(GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE, DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE); @@ -517,8 +515,22 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets offsets, Optional<State> topicSpecificState) { - Extract extract = this.createExtract(tableType, extractNameSpace, partition.getTopicName()); - if (isFullExtract) { + // Default to job level configurations + Extract.TableType currentTableType = tableType; + String currentExtractNamespace = extractNamespace; + boolean isCurrentFullExtract = isFullExtract; + // Update to topic specific configurations if any + if (topicSpecificState.isPresent()) { + State topicState = topicSpecificState.get(); + if (topicState.contains(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY)) { + currentTableType = Extract.TableType.valueOf(topicState.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY)); + } + currentExtractNamespace = topicState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, extractNamespace); + isCurrentFullExtract = topicState.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY, isFullExtract); + } + + Extract extract = this.createExtract(currentTableType, currentExtractNamespace, partition.getTopicName()); + if (isCurrentFullExtract) { extract.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, true); } @@ -536,6 +548,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset()); LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", partition, offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() - offsets.getStartOffset())); + return workUnit; }
