Repository: incubator-gobblin Updated Branches: refs/heads/master 8f32ab4c1 -> 0ee3cfdca
[GOBBLIN-292] Add kafka09 support for service and cluster job spec communication Add pluggable consumer client to SimpleKafkaSpecConsumer. Add pluggable producer to SimpleKafkaSpecProducer. Move common logic into the gobblin-service-kafka module. Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/590b872b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/590b872b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/590b872b Branch: refs/heads/master Commit: 590b872b094b5a27b2a12e2bc66d1a7514309bb5 Parents: 626d312 Author: Hung Tran <[email protected]> Authored: Thu Oct 19 13:09:35 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Oct 19 13:09:35 2017 -0700 ---------------------------------------------------------------------- gobblin-modules/gobblin-kafka-08/build.gradle | 1 + .../service/SimpleKafkaSpecConsumer.java | 264 ----------------- .../service/SimpleKafkaSpecExecutor.java | 105 ------- .../service/SimpleKafkaSpecProducer.java | 140 --------- .../service/StreamingKafkaSpecConsumer.java | 173 ----------- .../kafka/client/Kafka09ConsumerClient.java | 19 +- .../client/AbstractBaseKafkaConsumerClient.java | 3 +- .../gobblin-service-kafka/build.gradle | 44 +++ .../service/AvroJobSpecDeserializer.java | 70 +++++ .../service/SimpleKafkaSpecConsumer.java | 287 +++++++++++++++++++ .../service/SimpleKafkaSpecExecutor.java | 102 +++++++ .../service/SimpleKafkaSpecProducer.java | 157 ++++++++++ .../service/StreamingKafkaSpecConsumer.java | 175 +++++++++++ 13 files changed, 855 insertions(+), 685 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/build.gradle b/gobblin-modules/gobblin-kafka-08/build.gradle index b72ce3a..bc20884 100644 --- a/gobblin-modules/gobblin-kafka-08/build.gradle +++ b/gobblin-modules/gobblin-kafka-08/build.gradle @@ -56,6 +56,7 @@ dependencies { runtime externalDependency.confluentSchemaRegistryClient runtime externalDependency.protobuf + testCompile project(":gobblin-modules:gobblin-service-kafka") testCompile project(":gobblin-runtime") testCompile project(":gobblin-test-utils") testCompile externalDependency.jsonAssert http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java deleted file mode 100644 index 083ccf3..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service; - -import java.io.ByteArrayInputStream; -import java.io.Closeable; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Future; -import java.util.regex.Pattern; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.typesafe.config.Config; - -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.apache.gobblin.runtime.api.SpecExecutor; -import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord; -import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; -import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient; -import org.apache.gobblin.kafka.client.Kafka08ConsumerClient; -import org.apache.gobblin.kafka.client.KafkaConsumerRecord; -import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; -import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; -import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecConsumer; -import org.apache.gobblin.runtime.job_spec.AvroJobSpec; -import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException; -import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition; -import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; -import org.apache.gobblin.util.CompletedFuture; -import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*; - -import lombok.extern.slf4j.Slf4j; - - -@Slf4j -public class SimpleKafkaSpecConsumer implements SpecConsumer<Spec>, Closeable { - - // Consumer - protected final GobblinKafkaConsumerClient _kafka08Consumer; - protected final List<KafkaPartition> _partitions; - protected final List<Long> _lowWatermark; - protected final List<Long> _nextWatermark; - protected final List<Long> _highWatermark; - - private Iterator<KafkaConsumerRecord> messageIterator = null; - private int currentPartitionIdx = -1; - private boolean isFirstRun = true; - - private final BinaryDecoder _decoder; - private final SpecificDatumReader<AvroJobSpec> _reader; - private final SchemaVersionWriter<?> _versionWriter; - - public SimpleKafkaSpecConsumer(Config config, Optional<Logger> log) { - - // Consumer - _kafka08Consumer = new Kafka08ConsumerClient.Factory().create(config); - List<KafkaTopic> kafkaTopics = _kafka08Consumer.getFilteredTopics(Collections.EMPTY_LIST, - Lists.newArrayList(Pattern.compile(config.getString(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY)))); - _partitions = kafkaTopics.get(0).getPartitions(); - _lowWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L)); - _nextWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L)); - _highWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L)); - - InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]); - _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null); - _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$); - _versionWriter = new FixedSchemaVersionWriter(); - } - - public SimpleKafkaSpecConsumer(Config config, Logger log) { - this(config, Optional.of(log)); - } - - /** Constructor with no logging */ - public SimpleKafkaSpecConsumer(Config config) { - this(config, Optional.<Logger>absent()); - } - - @Override - public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() { - List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = new ArrayList<>(); - initializeWatermarks(); - this.currentPartitionIdx = -1; - while (!allPartitionsFinished()) { - if (currentPartitionFinished()) { - moveToNextPartition(); - continue; - } - if (this.messageIterator == null || !this.messageIterator.hasNext()) { - try { - this.messageIterator = fetchNextMessageBuffer(); - } catch (Exception e) { - log.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.", - getCurrentPartition()), e); - moveToNextPartition(); - continue; - } - if (this.messageIterator == null || !this.messageIterator.hasNext()) { - moveToNextPartition(); - continue; - } - } - while (!currentPartitionFinished()) { - if (!this.messageIterator.hasNext()) { - break; - } - - KafkaConsumerRecord nextValidMessage = this.messageIterator.next(); - - // Even though we ask Kafka to give us a message buffer starting from offset x, it may - // return a buffer that starts from offset smaller than x, so we need to skip messages - // until we get to x. - if (nextValidMessage.getOffset() < _nextWatermark.get(this.currentPartitionIdx)) { - continue; - } - - _nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset()); - try { - final AvroJobSpec record; - - if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) { - record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage); - } else if (nextValidMessage instanceof DecodeableKafkaRecord){ - record = ((DecodeableKafkaRecord<?, AvroJobSpec>) nextValidMessage).getValue(); - } else { - throw new IllegalStateException( - "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord" - + " or DecodeableKafkaRecord"); - } - - JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri()); - - Properties props = new Properties(); - props.putAll(record.getProperties()); - jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion()) - .withDescription(record.getDescription()).withConfigAsProperties(props); - - if (!record.getTemplateUri().isEmpty()) { - jobSpecBuilder.withTemplate(new URI(record.getTemplateUri())); - } - - String verbName = record.getMetadata().get(VERB_KEY); - Verb verb = Verb.valueOf(verbName); - - changesSpecs.add(new ImmutablePair<Verb, Spec>(verb, jobSpecBuilder.build())); - } catch (Throwable t) { - log.error("Could not decode record at partition " + this.currentPartitionIdx + - " offset " + nextValidMessage.getOffset()); - } - } - } - - return new CompletedFuture(changesSpecs, null); - } - - private void initializeWatermarks() { - initializeLowWatermarks(); - initializeHighWatermarks(); - } - - private void initializeLowWatermarks() { - try { - int i=0; - for (KafkaPartition kafkaPartition : _partitions) { - if (isFirstRun) { - long earliestOffset = _kafka08Consumer.getEarliestOffset(kafkaPartition); - _lowWatermark.set(i, earliestOffset); - } else { - _lowWatermark.set(i, _highWatermark.get(i)); - } - i++; - } - isFirstRun = false; - } catch (KafkaOffsetRetrievalFailureException e) { - throw new RuntimeException(e); - } - } - - private void initializeHighWatermarks() { - try { - int i=0; - for (KafkaPartition kafkaPartition : _partitions) { - long latestOffset = _kafka08Consumer.getLatestOffset(kafkaPartition); - _highWatermark.set(i, latestOffset); - i++; - } - } catch (KafkaOffsetRetrievalFailureException e) { - throw new RuntimeException(e); - } - } - - private boolean allPartitionsFinished() { - return this.currentPartitionIdx >= _nextWatermark.size(); - } - - private boolean currentPartitionFinished() { - if (this.currentPartitionIdx == -1) { - return true; - } else if (_nextWatermark.get(this.currentPartitionIdx) >= _highWatermark.get(this.currentPartitionIdx)) { - return true; - } else { - return false; - } - } - - private int moveToNextPartition() { - this.messageIterator = null; - return this.currentPartitionIdx ++; - } - - private KafkaPartition getCurrentPartition() { - return _partitions.get(this.currentPartitionIdx); - } - - private Iterator<KafkaConsumerRecord> fetchNextMessageBuffer() { - return _kafka08Consumer.consume(_partitions.get(this.currentPartitionIdx), - _nextWatermark.get(this.currentPartitionIdx), _highWatermark.get(this.currentPartitionIdx)); - } - - private AvroJobSpec decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException { - InputStream is = new ByteArrayInputStream(kafkaConsumerRecord.getMessageBytes()); - _versionWriter.readSchemaVersioningInformation(new DataInputStream(is)); - - Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder); - - return _reader.read(null, decoder); - } - - @Override - public void close() throws IOException { - _kafka08Consumer.close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java deleted file mode 100644 index 8545bf6..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service; - -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.concurrent.Future; - -import com.google.common.base.Optional; -import com.typesafe.config.Config; -import com.google.common.io.Closer; - -import org.slf4j.Logger; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.util.CompletedFuture; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.runtime.api.SpecExecutor; -import org.apache.gobblin.runtime.api.SpecConsumer; -import org.apache.gobblin.runtime.api.SpecProducer; -import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor; - -/** - * An {@link SpecExecutor} that use Kafka as the communication mechanism. - */ -public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor { - public static final String SPEC_KAFKA_TOPICS_KEY = "spec.kafka.topics"; - - - protected static final String VERB_KEY = "Verb"; - - private SpecProducer<Spec> specProducer; - - public SimpleKafkaSpecExecutor(Config config, Optional<Logger> log) { - super(config, log); - specProducer = new SimpleKafkaSpecProducer(config, log); - } - - /** - * Constructor with no logging, necessary for simple use case. - * @param config - */ - public SimpleKafkaSpecExecutor(Config config) { - this(config, Optional.absent()); - } - - @Override - public Future<? extends SpecProducer> getProducer() { - return new CompletedFuture<>(this.specProducer, null); - } - - @Override - public Future<String> getDescription() { - return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null); - } - - @Override - protected void startUp() throws Exception { - optionalCloser = Optional.of(Closer.create()); - specProducer = optionalCloser.get().register((SimpleKafkaSpecProducer) specProducer); - } - - @Override - protected void shutDown() throws Exception { - if (optionalCloser.isPresent()) { - optionalCloser.get().close(); - } else { - log.warn("There's no Closer existed in " + this.getClass().getName()); - } - } - - public static class SpecExecutorInstanceDataPacket implements Serializable { - - protected Verb _verb; - protected URI _uri; - protected Spec _spec; - - public SpecExecutorInstanceDataPacket(Verb verb, URI uri, Spec spec) { - _verb = verb; - _uri = uri; - _spec = spec; - } - - @Override - public String toString() { - return String.format("Verb: %s, URI: %s, Spec: %s", _verb, _uri, _spec); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java deleted file mode 100644 index 13aae6f..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service; - -import java.io.Closeable; -import java.io.IOException; -import java.net.URI; -import java.util.List; -import java.util.concurrent.Future; -import javax.annotation.concurrent.NotThreadSafe; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; - -import org.slf4j.Logger; -import org.apache.gobblin.runtime.api.SpecExecutor; -import org.apache.gobblin.runtime.api.SpecProducer; -import org.apache.gobblin.kafka.writer.Kafka08DataWriter; -import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer; -import org.apache.gobblin.metrics.reporter.util.AvroSerializer; -import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; -import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.job_spec.AvroJobSpec; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.writer.WriteCallback; -import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@NotThreadSafe -public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable { - - // Producer - protected Kafka08DataWriter<byte[]> _kafka08Producer; - private final AvroSerializer<AvroJobSpec> _serializer; - private Config _config; - - public SimpleKafkaSpecProducer(Config config, Optional<Logger> log) { - - try { - _serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new FixedSchemaVersionWriter()); - _config = config; - } catch (IOException e) { - throw new RuntimeException("Could not create AvroBinarySerializer", e); - } - } - - public SimpleKafkaSpecProducer(Config config, Logger log) { - this(config, Optional.of(log)); - } - - /** Constructor with no logging */ - public SimpleKafkaSpecProducer(Config config) { - this(config, Optional.<Logger>absent()); - } - - @Override - public Future<?> addSpec(Spec addedSpec) { - AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD); - - log.info("Adding Spec: " + addedSpec + " using Kafka."); - - return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY); - } - - @Override - public Future<?> updateSpec(Spec updatedSpec) { - AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE); - - log.info("Updating Spec: " + updatedSpec + " using Kafka."); - - return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY); - } - - @Override - public Future<?> deleteSpec(URI deletedSpecURI) { - - AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString()) - .setMetadata(ImmutableMap.of(VERB_KEY, Verb.DELETE.name())).build(); - - log.info("Deleting Spec: " + deletedSpecURI + " using Kafka."); - - return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY); - } - - @Override - public Future<? extends List<Spec>> listSpecs() { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws IOException { - _kafka08Producer.close(); - } - - private Kafka08DataWriter<byte[]> getKafka08Producer() { - if (null == _kafka08Producer) { - _kafka08Producer = new Kafka08DataWriter<byte[]>(ConfigUtils.configToProperties(_config)); - } - return _kafka08Producer; - } - - private AvroJobSpec convertToAvroJobSpec(Spec spec, Verb verb) { - if (spec instanceof JobSpec) { - JobSpec jobSpec = (JobSpec) spec; - AvroJobSpec.Builder avroJobSpecBuilder = AvroJobSpec.newBuilder(); - - avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion()) - .setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties())) - .setMetadata(ImmutableMap.of(VERB_KEY, verb.name())); - - if (jobSpec.getTemplateURI().isPresent()) { - avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString()); - } - - return avroJobSpecBuilder.build(); - } else { - throw new RuntimeException("Unsupported spec type " + spec.getClass()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java deleted file mode 100644 index fd42211..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service; - -import java.io.Closeable; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.google.common.util.concurrent.AbstractIdleService; - -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.MutableJobCatalog; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecConsumer; -import org.apache.gobblin.runtime.job_monitor.AvroJobSpecKafkaJobMonitor; -import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor; -import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl; -import org.apache.gobblin.util.CompletedFuture; -import org.apache.gobblin.util.ConfigUtils; -import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j -/** - * SpecConsumer that consumes from kafka in a streaming manner - * Implemented {@link AbstractIdleService} for starting up and shutting down. - */ -public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable { - public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize"; - private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100; - private final AvroJobSpecKafkaJobMonitor _jobMonitor; - private final BlockingQueue<ImmutablePair<Verb, Spec>> _jobSpecQueue; - - public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) { - String topic = config.getString(SPEC_KAFKA_TOPICS_KEY); - Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic, - KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_KEY, KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_SMALLEST)); - - try { - _jobMonitor = (AvroJobSpecKafkaJobMonitor)(new AvroJobSpecKafkaJobMonitor.Factory()) - .forConfig(config.withFallback(defaults), jobCatalog); - } catch (IOException e) { - throw new RuntimeException("Could not create job monitor", e); - } - - _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE", - DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE)); - - // listener will add job specs to a blocking queue to send to callers of changedSpecs() - jobCatalog.addListener(new JobSpecListener()); - } - - public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) { - this(config, jobCatalog, Optional.of(log)); - } - - /** Constructor with no logging */ - public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog) { - this(config, jobCatalog, Optional.<Logger>absent()); - } - - /** - * This method returns job specs receive from Kafka. It will block if there are no job specs. - * @return list of (verb, jobspecs) pairs. - */ - @Override - public Future<? extends List<Pair<Verb, Spec>>> changedSpecs() { - List<Pair<Verb, Spec>> changesSpecs = new ArrayList<>(); - - try { - Pair<Verb, Spec> specPair = _jobSpecQueue.take(); - - do { - changesSpecs.add(specPair); - - // if there are more elements then pass them along in this call - specPair = _jobSpecQueue.poll(); - } while (specPair != null); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - return new CompletedFuture(changesSpecs, null); - } - - @Override - protected void startUp() { - _jobMonitor.startAsync().awaitRunning(); - } - - @Override - protected void shutDown() { - _jobMonitor.stopAsync().awaitTerminated(); - } - - @Override - public void close() throws IOException { - shutDown(); - } - - /** - * JobCatalog listener that puts messages into a blocking queue for consumption by changedSpecs method of - * {@link StreamingKafkaSpecConsumer} - */ - protected class JobSpecListener extends DefaultJobCatalogListenerImpl { - public JobSpecListener() { - super(StreamingKafkaSpecConsumer.this.log); - } - - @Override public void onAddJob(JobSpec addedJob) { - super.onAddJob(addedJob); - - try { - _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.ADD, addedJob)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - @Override public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) { - super.onDeleteJob(deletedJobURI, deletedJobVersion); - try { - JobSpec.Builder jobSpecBuilder = JobSpec.builder(deletedJobURI); - - Properties props = new Properties(); - jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props); - - _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.DELETE, jobSpecBuilder.build())); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - @Override public void onUpdateJob(JobSpec updatedJob) { - super.onUpdateJob(updatedJob); - - try { - _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.UPDATE, updatedJob)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java index b6cd35d..9b952ab 100644 --- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java @@ -17,6 +17,7 @@ package org.apache.gobblin.kafka.client; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; @@ -81,6 +82,9 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient super(config); Preconditions.checkArgument(config.hasPath(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY), "Missing required property " + GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY); + + Config scopedConfig = ConfigUtils.getConfigOrEmpty(config, AbstractBaseKafkaConsumerClient.CONFIG_PREFIX_NO_DOT); + Properties props = new Properties(); props.put(KAFKA_09_CLIENT_BOOTSTRAP_SERVERS_KEY, Joiner.on(",").join(super.brokers)); props.put(KAFKA_09_CLIENT_ENABLE_AUTO_COMMIT_KEY, KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT); @@ -89,6 +93,9 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient ConfigUtils.getString(config, GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY, KAFKA_09_DEFAULT_KEY_DESERIALIZER)); props.put(KAFKA_09_CLIENT_VALUE_DESERIALIZER_CLASS_KEY, config.getString(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY)); + + props.putAll(ConfigUtils.configToProperties(scopedConfig)); + this.consumer = new KafkaConsumer<>(props); } @@ -112,12 +119,20 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient @Override public long getEarliestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException { - throw new UnsupportedOperationException("getEarliestOffset and getLatestOffset is not supported by Kafka-09"); + TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId()); + this.consumer.assign(Collections.singletonList(topicPartition)); + this.consumer.seekToBeginning(topicPartition); + + return this.consumer.position(topicPartition); } @Override public long getLatestOffset(KafkaPartition partition) throws KafkaOffsetRetrievalFailureException { - throw new UnsupportedOperationException("getEarliestOffset and getLatestOffset is not supported by Kafka-09"); + TopicPartition topicPartition = new TopicPartition(partition.getTopicName(), partition.getId()); + this.consumer.assign(Collections.singletonList(topicPartition)); + this.consumer.seekToEnd(topicPartition); + + return this.consumer.position(topicPartition); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java index 00b751b..638a879 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java @@ -41,7 +41,8 @@ import javax.annotation.Nullable; */ public abstract class AbstractBaseKafkaConsumerClient implements GobblinKafkaConsumerClient { - public static final String CONFIG_PREFIX = "source.kafka."; + public static final String CONFIG_PREFIX_NO_DOT = "source.kafka"; + public static final String CONFIG_PREFIX = CONFIG_PREFIX_NO_DOT + "."; public static final String CONFIG_KAFKA_FETCH_TIMEOUT_VALUE = CONFIG_PREFIX + "fetchTimeoutMillis"; public static final int CONFIG_KAFKA_FETCH_TIMEOUT_VALUE_DEFAULT = 1000; // 1 second public static final String CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES = CONFIG_PREFIX + "fetchMinBytes"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-service-kafka/build.gradle b/gobblin-modules/gobblin-service-kafka/build.gradle new file mode 100644 index 0000000..e4d1146 --- /dev/null +++ b/gobblin-modules/gobblin-service-kafka/build.gradle @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'java' + +dependencies { + compile project(":gobblin-runtime") + + compile externalDependency.avro + compile externalDependency.slf4j + compile externalDependency.lombok + compile externalDependency.typesafeConfig + + testCompile externalDependency.testng +} + +configurations { + compile { transitive = false } + // Remove xerces dependencies because of versioning issues. Standard JRE implementation should + // work. See also http://stackoverflow.com/questions/11677572/dealing-with-xerces-hell-in-java-maven + // HADOOP-5254 and MAPREDUCE-5664 + all*.exclude group: 'xml-apis' + all*.exclude group: 'xerces' +} + +test { + workingDir rootProject.rootDir +} + +ext.classification="library" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java new file mode 100644 index 0000000..879dea4 --- /dev/null +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/AvroJobSpecDeserializer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.kafka.common.serialization.Deserializer; + +import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; +import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; +import org.apache.gobblin.runtime.job_spec.AvroJobSpec; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +/** + * A deserializer that converts a byte array into an {@link AvroJobSpec} + */ +public class AvroJobSpecDeserializer implements Deserializer<AvroJobSpec> { + private BinaryDecoder _decoder; + private SpecificDatumReader<AvroJobSpec> _reader; + private SchemaVersionWriter<?> _versionWriter; + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]); + _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null); + _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$); + _versionWriter = new FixedSchemaVersionWriter(); + } + + @Override + public AvroJobSpec deserialize(String topic, byte[] data) { + try (InputStream is = new ByteArrayInputStream(data)) { + _versionWriter.readSchemaVersioningInformation(new DataInputStream(is)); + + Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder); + + return _reader.read(null, decoder); + } catch (IOException e) { + throw new RuntimeException("Could not decode message"); + } + } + + @Override + public void close() { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java new file mode 100644 index 0000000..139d204 --- /dev/null +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.regex.Pattern; + +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord; +import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient; +import org.apache.gobblin.kafka.client.KafkaConsumerRecord; +import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; +import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecConsumer; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.job_spec.AvroJobSpec; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; +import org.apache.gobblin.util.CompletedFuture; +import org.apache.gobblin.util.ConfigUtils; + +import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.VERB_KEY; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class SimpleKafkaSpecConsumer implements SpecConsumer<Spec>, Closeable { + private static final String CONSUMER_CLIENT_FACTORY_CLASS_KEY = "spec.kafka.consumerClientClassFactory"; + private static final String DEFAULT_CONSUMER_CLIENT_FACTORY_CLASS = + "org.apache.gobblin.kafka.client.Kafka08ConsumerClient$Factory"; + + // Consumer + protected final GobblinKafkaConsumerClient _kafkaConsumer; + protected final List<KafkaPartition> _partitions; + protected final List<Long> _lowWatermark; + protected final List<Long> _nextWatermark; + protected final List<Long> _highWatermark; + + private Iterator<KafkaConsumerRecord> messageIterator = null; + private int currentPartitionIdx = -1; + private boolean isFirstRun = true; + + private final BinaryDecoder _decoder; + private final SpecificDatumReader<AvroJobSpec> _reader; + private final SchemaVersionWriter<?> _versionWriter; + + public SimpleKafkaSpecConsumer(Config config, Optional<Logger> log) { + + // Consumer + String kafkaConsumerClientClass = ConfigUtils.getString(config, CONSUMER_CLIENT_FACTORY_CLASS_KEY, + DEFAULT_CONSUMER_CLIENT_FACTORY_CLASS); + + try { + Class<?> clientFactoryClass = (Class<?>) Class.forName(kafkaConsumerClientClass); + final GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory factory = + (GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory) + ConstructorUtils.invokeConstructor(clientFactoryClass); + + _kafkaConsumer = factory.create(config); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + if (log.isPresent()) { + log.get().error("Failed to instantiate Kafka consumer from class " + kafkaConsumerClientClass, e); + } + + throw new RuntimeException("Failed to instantiate Kafka consumer", e); + } + + List<KafkaTopic> kafkaTopics = _kafkaConsumer.getFilteredTopics(Collections.EMPTY_LIST, + Lists.newArrayList(Pattern.compile(config.getString(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY)))); + _partitions = kafkaTopics.get(0).getPartitions(); + _lowWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L)); + _nextWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L)); + _highWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L)); + + InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]); + _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null); + _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$); + _versionWriter = new FixedSchemaVersionWriter(); + } + + public SimpleKafkaSpecConsumer(Config config, Logger log) { + this(config, Optional.of(log)); + } + + /** Constructor with no logging */ + public SimpleKafkaSpecConsumer(Config config) { + this(config, Optional.<Logger>absent()); + } + + @Override + public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() { + List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = new ArrayList<>(); + initializeWatermarks(); + this.currentPartitionIdx = -1; + while (!allPartitionsFinished()) { + if (currentPartitionFinished()) { + moveToNextPartition(); + continue; + } + if (this.messageIterator == null || !this.messageIterator.hasNext()) { + try { + this.messageIterator = fetchNextMessageBuffer(); + } catch (Exception e) { + log.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.", + getCurrentPartition()), e); + moveToNextPartition(); + continue; + } + if (this.messageIterator == null || !this.messageIterator.hasNext()) { + moveToNextPartition(); + continue; + } + } + while (!currentPartitionFinished()) { + if (!this.messageIterator.hasNext()) { + break; + } + + KafkaConsumerRecord nextValidMessage = this.messageIterator.next(); + + // Even though we ask Kafka to give us a message buffer starting from offset x, it may + // return a buffer that starts from offset smaller than x, so we need to skip messages + // until we get to x. + if (nextValidMessage.getOffset() < _nextWatermark.get(this.currentPartitionIdx)) { + continue; + } + + _nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset()); + try { + final AvroJobSpec record; + + if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) { + record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage); + } else if (nextValidMessage instanceof DecodeableKafkaRecord){ + record = ((DecodeableKafkaRecord<?, AvroJobSpec>) nextValidMessage).getValue(); + } else { + throw new IllegalStateException( + "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord" + + " or DecodeableKafkaRecord"); + } + + JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri()); + + Properties props = new Properties(); + props.putAll(record.getProperties()); + jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion()) + .withDescription(record.getDescription()).withConfigAsProperties(props); + + if (!record.getTemplateUri().isEmpty()) { + jobSpecBuilder.withTemplate(new URI(record.getTemplateUri())); + } + + String verbName = record.getMetadata().get(VERB_KEY); + SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName); + + changesSpecs.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, jobSpecBuilder.build())); + } catch (Throwable t) { + log.error("Could not decode record at partition " + this.currentPartitionIdx + + " offset " + nextValidMessage.getOffset()); + } + } + } + + return new CompletedFuture(changesSpecs, null); + } + + private void initializeWatermarks() { + initializeLowWatermarks(); + initializeHighWatermarks(); + } + + private void initializeLowWatermarks() { + try { + int i=0; + for (KafkaPartition kafkaPartition : _partitions) { + if (isFirstRun) { + long earliestOffset = _kafkaConsumer.getEarliestOffset(kafkaPartition); + _lowWatermark.set(i, earliestOffset); + } else { + _lowWatermark.set(i, _highWatermark.get(i)); + } + i++; + } + isFirstRun = false; + } catch (KafkaOffsetRetrievalFailureException e) { + throw new RuntimeException(e); + } + } + + private void initializeHighWatermarks() { + try { + int i=0; + for (KafkaPartition kafkaPartition : _partitions) { + long latestOffset = _kafkaConsumer.getLatestOffset(kafkaPartition); + _highWatermark.set(i, latestOffset); + i++; + } + } catch (KafkaOffsetRetrievalFailureException e) { + throw new RuntimeException(e); + } + } + + private boolean allPartitionsFinished() { + return this.currentPartitionIdx >= _nextWatermark.size(); + } + + private boolean currentPartitionFinished() { + if (this.currentPartitionIdx == -1) { + return true; + } else if (_nextWatermark.get(this.currentPartitionIdx) >= _highWatermark.get(this.currentPartitionIdx)) { + return true; + } else { + return false; + } + } + + private int moveToNextPartition() { + this.messageIterator = null; + return this.currentPartitionIdx ++; + } + + private KafkaPartition getCurrentPartition() { + return _partitions.get(this.currentPartitionIdx); + } + + private Iterator<KafkaConsumerRecord> fetchNextMessageBuffer() { + return _kafkaConsumer.consume(_partitions.get(this.currentPartitionIdx), + _nextWatermark.get(this.currentPartitionIdx), _highWatermark.get(this.currentPartitionIdx)); + } + + private AvroJobSpec decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException { + InputStream is = new ByteArrayInputStream(kafkaConsumerRecord.getMessageBytes()); + _versionWriter.readSchemaVersioningInformation(new DataInputStream(is)); + + Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder); + + return _reader.read(null, decoder); + } + + @Override + public void close() throws IOException { + _kafkaConsumer.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java new file mode 100644 index 0000000..c3dfcb3 --- /dev/null +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.Serializable; +import java.net.URI; +import java.util.concurrent.Future; + +import org.slf4j.Logger; + +import com.google.common.base.Optional; +import com.google.common.io.Closer; +import com.typesafe.config.Config; + +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor; +import org.apache.gobblin.util.CompletedFuture; + +/** + * An {@link SpecExecutor} that use Kafka as the communication mechanism. + */ +public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor { + public static final String SPEC_KAFKA_TOPICS_KEY = "spec.kafka.topics"; + + + protected static final String VERB_KEY = "Verb"; + + private SpecProducer<Spec> specProducer; + + public SimpleKafkaSpecExecutor(Config config, Optional<Logger> log) { + super(config, log); + specProducer = new SimpleKafkaSpecProducer(config, log); + } + + /** + * Constructor with no logging, necessary for simple use case. + * @param config + */ + public SimpleKafkaSpecExecutor(Config config) { + this(config, Optional.absent()); + } + + @Override + public Future<? extends SpecProducer> getProducer() { + return new CompletedFuture<>(this.specProducer, null); + } + + @Override + public Future<String> getDescription() { + return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null); + } + + @Override + protected void startUp() throws Exception { + optionalCloser = Optional.of(Closer.create()); + specProducer = optionalCloser.get().register((SimpleKafkaSpecProducer) specProducer); + } + + @Override + protected void shutDown() throws Exception { + if (optionalCloser.isPresent()) { + optionalCloser.get().close(); + } else { + log.warn("There's no Closer existed in " + this.getClass().getName()); + } + } + + public static class SpecExecutorInstanceDataPacket implements Serializable { + + protected SpecExecutor.Verb _verb; + protected URI _uri; + protected Spec _spec; + + public SpecExecutorInstanceDataPacket(SpecExecutor.Verb verb, URI uri, Spec spec) { + _verb = verb; + _uri = uri; + _spec = spec; + } + + @Override + public String toString() { + return String.format("Verb: %s, URI: %s, Spec: %s", _verb, _uri, _spec); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java new file mode 100644 index 0000000..a5163db --- /dev/null +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URI; +import java.util.List; +import java.util.concurrent.Future; + +import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.slf4j.Logger; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; + +import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer; +import org.apache.gobblin.metrics.reporter.util.AvroSerializer; +import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.job_spec.AvroJobSpec; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.AsyncDataWriter; +import org.apache.gobblin.writer.WriteCallback; + +import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.VERB_KEY; +import javax.annotation.concurrent.NotThreadSafe; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@NotThreadSafe +public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable { + private static final String KAFKA_DATA_WRITER_CLASS_KEY = "spec.kafka.dataWriterClass"; + private static final String DEFAULT_KAFKA_DATA_WRITER_CLASS = + "org.apache.gobblin.kafka.writer.Kafka08DataWriter"; + + // Producer + protected AsyncDataWriter<byte[]> _kafkaProducer; + private final AvroSerializer<AvroJobSpec> _serializer; + private Config _config; + private final String _kafkaProducerClassName; + + public SimpleKafkaSpecProducer(Config config, Optional<Logger> log) { + _kafkaProducerClassName = ConfigUtils.getString(config, KAFKA_DATA_WRITER_CLASS_KEY, + DEFAULT_KAFKA_DATA_WRITER_CLASS); + + try { + _serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new FixedSchemaVersionWriter()); + _config = config; + } catch (IOException e) { + throw new RuntimeException("Could not create AvroBinarySerializer", e); + } + } + + public SimpleKafkaSpecProducer(Config config, Logger log) { + this(config, Optional.of(log)); + } + + /** Constructor with no logging */ + public SimpleKafkaSpecProducer(Config config) { + this(config, Optional.<Logger>absent()); + } + + @Override + public Future<?> addSpec(Spec addedSpec) { + AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD); + + log.info("Adding Spec: " + addedSpec + " using Kafka."); + + return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY); + } + + @Override + public Future<?> updateSpec(Spec updatedSpec) { + AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE); + + log.info("Updating Spec: " + updatedSpec + " using Kafka."); + + return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY); + } + + @Override + public Future<?> deleteSpec(URI deletedSpecURI) { + + AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString()) + .setMetadata(ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.DELETE.name())).build(); + + log.info("Deleting Spec: " + deletedSpecURI + " using Kafka."); + + return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY); + } + + @Override + public Future<? extends List<Spec>> listSpecs() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + _kafkaProducer.close(); + } + + private AsyncDataWriter<byte[]> getKafkaProducer() { + if (null == _kafkaProducer) { + try { + Class<?> kafkaProducerClass = (Class<?>) Class.forName(_kafkaProducerClassName); + _kafkaProducer = (AsyncDataWriter<byte[]>) ConstructorUtils.invokeConstructor(kafkaProducerClass, + ConfigUtils.configToProperties(_config)); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + log.error("Failed to instantiate Kafka consumer from class " + _kafkaProducerClassName, e); + + throw new RuntimeException("Failed to instantiate Kafka consumer", e); + } + } + return _kafkaProducer; + } + + private AvroJobSpec convertToAvroJobSpec(Spec spec, SpecExecutor.Verb verb) { + if (spec instanceof JobSpec) { + JobSpec jobSpec = (JobSpec) spec; + AvroJobSpec.Builder avroJobSpecBuilder = AvroJobSpec.newBuilder(); + + avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion()) + .setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties())) + .setMetadata(ImmutableMap.of(VERB_KEY, verb.name())); + + if (jobSpec.getTemplateURI().isPresent()) { + avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString()); + } + + return avroJobSpecBuilder.build(); + } else { + throw new RuntimeException("Unsupported spec type " + spec.getClass()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/590b872b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java new file mode 100644 index 0000000..7d7b702 --- /dev/null +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.AbstractIdleService; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.MutableJobCatalog; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecConsumer; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.job_monitor.AvroJobSpecKafkaJobMonitor; +import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor; +import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl; +import org.apache.gobblin.util.CompletedFuture; +import org.apache.gobblin.util.ConfigUtils; + +import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +/** + * SpecConsumer that consumes from kafka in a streaming manner + * Implemented {@link AbstractIdleService} for starting up and shutting down. + */ +public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable { + public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize"; + private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100; + private final AvroJobSpecKafkaJobMonitor _jobMonitor; + private final BlockingQueue<ImmutablePair<SpecExecutor.Verb, Spec>> _jobSpecQueue; + + public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) { + String topic = config.getString(SPEC_KAFKA_TOPICS_KEY); + Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic, + KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_KEY, KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_SMALLEST)); + + try { + _jobMonitor = (AvroJobSpecKafkaJobMonitor)(new AvroJobSpecKafkaJobMonitor.Factory()) + .forConfig(config.withFallback(defaults), jobCatalog); + } catch (IOException e) { + throw new RuntimeException("Could not create job monitor", e); + } + + _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE", + DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE)); + + // listener will add job specs to a blocking queue to send to callers of changedSpecs() + jobCatalog.addListener(new JobSpecListener()); + } + + public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) { + this(config, jobCatalog, Optional.of(log)); + } + + /** Constructor with no logging */ + public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog) { + this(config, jobCatalog, Optional.<Logger>absent()); + } + + /** + * This method returns job specs receive from Kafka. It will block if there are no job specs. + * @return list of (verb, jobspecs) pairs. + */ + @Override + public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() { + List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = new ArrayList<>(); + + try { + Pair<SpecExecutor.Verb, Spec> specPair = _jobSpecQueue.take(); + + do { + changesSpecs.add(specPair); + + // if there are more elements then pass them along in this call + specPair = _jobSpecQueue.poll(); + } while (specPair != null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return new CompletedFuture(changesSpecs, null); + } + + @Override + protected void startUp() { + _jobMonitor.startAsync().awaitRunning(); + } + + @Override + protected void shutDown() { + _jobMonitor.stopAsync().awaitTerminated(); + } + + @Override + public void close() throws IOException { + shutDown(); + } + + /** + * JobCatalog listener that puts messages into a blocking queue for consumption by changedSpecs method of + * {@link StreamingKafkaSpecConsumer} + */ + protected class JobSpecListener extends DefaultJobCatalogListenerImpl { + public JobSpecListener() { + super(StreamingKafkaSpecConsumer.this.log); + } + + @Override public void onAddJob(JobSpec addedJob) { + super.onAddJob(addedJob); + + try { + _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.ADD, addedJob)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) { + super.onDeleteJob(deletedJobURI, deletedJobVersion); + try { + JobSpec.Builder jobSpecBuilder = JobSpec.builder(deletedJobURI); + + Properties props = new Properties(); + jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props); + + _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.DELETE, jobSpecBuilder.build())); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override public void onUpdateJob(JobSpec updatedJob) { + super.onUpdateJob(updatedJob); + + try { + _jobSpecQueue.put(new ImmutablePair<SpecExecutor.Verb, Spec>(SpecExecutor.Verb.UPDATE, updatedJob)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} \ No newline at end of file
