http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstance.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstance.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstance.java deleted file mode 100644 index a60d8e2..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstance.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service; - -import com.google.common.util.concurrent.AbstractIdleService; -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Future; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstance; -import org.apache.gobblin.util.CompletedFuture; -import org.apache.gobblin.util.ConfigUtils; - - -public class SimpleKafkaSpecExecutorInstance extends AbstractIdleService implements SpecExecutorInstance { - public static final String SPEC_KAFKA_TOPICS_KEY = "spec.kafka.topics"; - protected static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults(); - protected static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults(); - - // Executor Instance - protected final Config _config; - protected final Logger _log; - protected final URI _specExecutorInstanceUri; - protected final Map<String, String> _capabilities; - - protected static final String VERB_KEY = "Verb"; - - public SimpleKafkaSpecExecutorInstance(Config config, Optional<Logger> log) { - _config = config; - _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); - try { - _specExecutorInstanceUri = new URI(ConfigUtils.getString(config, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, - "NA")); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - _capabilities = Maps.newHashMap(); - if (config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY)) { - String capabilitiesStr = config.getString(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY); - List<String> capabilities = SPLIT_BY_COMMA.splitToList(capabilitiesStr); - for (String capability : capabilities) { - List<String> currentCapability = SPLIT_BY_COLON.splitToList(capability); - Preconditions.checkArgument(currentCapability.size() == 2, "Only one source:destination pair is supported " - + "per capability, found: " + currentCapability); - _capabilities.put(currentCapability.get(0), currentCapability.get(1)); - } - } - } - - @Override - public URI getUri() { - return _specExecutorInstanceUri; - } - - @Override - public Future<String> getDescription() { - return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + _specExecutorInstanceUri, null); - } - - @Override - public Future<Config> getConfig() { - return new CompletedFuture<>(_config, null); - } - - @Override - public Future<String> getHealth() { - return new CompletedFuture<>("Healthy", null); - } - - @Override - public Future<? extends Map<String, String>> getCapabilities() { - return new CompletedFuture<>(_capabilities, null); - } - - @Override - protected void startUp() throws Exception { - // nothing to do in default implementation - } - - @Override - protected void shutDown() throws Exception { - // nothing to do in default implementation - } - - public static class SpecExecutorInstanceDataPacket implements Serializable { - - protected Verb _verb; - protected URI _uri; - protected Spec _spec; - - public SpecExecutorInstanceDataPacket(Verb verb, URI uri, Spec spec) { - _verb = verb; - _uri = uri; - _spec = spec; - } - - @Override - public String toString() { - return String.format("Verb: %s, URI: %s, Spec: %s", _verb, _uri, _spec); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceConsumer.java deleted file mode 100644 index 90960e7..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceConsumer.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service; - -import java.io.ByteArrayInputStream; -import java.io.Closeable; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Future; -import java.util.regex.Pattern; - -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.typesafe.config.Config; - -import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord; -import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; -import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient; -import org.apache.gobblin.kafka.client.Kafka08ConsumerClient; -import org.apache.gobblin.kafka.client.KafkaConsumerRecord; -import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; -import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; -import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceConsumer; -import org.apache.gobblin.runtime.job_spec.AvroJobSpec; -import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException; -import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition; -import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; -import org.apache.gobblin.util.CompletedFuture; - -public class SimpleKafkaSpecExecutorInstanceConsumer extends SimpleKafkaSpecExecutorInstance - implements SpecExecutorInstanceConsumer<Spec>, Closeable { - - // Consumer - protected final GobblinKafkaConsumerClient _kafka08Consumer; - protected final List<KafkaPartition> _partitions; - protected final List<Long> _lowWatermark; - protected final List<Long> _nextWatermark; - protected final List<Long> _highWatermark; - - private Iterator<KafkaConsumerRecord> messageIterator = null; - private int currentPartitionIdx = -1; - private boolean isFirstRun = true; - - private final BinaryDecoder _decoder; - private final SpecificDatumReader<AvroJobSpec> _reader; - private final SchemaVersionWriter<?> _versionWriter; - - public SimpleKafkaSpecExecutorInstanceConsumer(Config config, Optional<Logger> log) { - super(config, log); - - // Consumer - _kafka08Consumer = new Kafka08ConsumerClient.Factory().create(config); - List<KafkaTopic> kafkaTopics = _kafka08Consumer.getFilteredTopics(Collections.EMPTY_LIST, - Lists.newArrayList(Pattern.compile(config.getString(SPEC_KAFKA_TOPICS_KEY)))); - _partitions = kafkaTopics.get(0).getPartitions(); - _lowWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L)); - _nextWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L)); - _highWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L)); - - InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]); - _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null); - _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$); - _versionWriter = new FixedSchemaVersionWriter(); - } - - public SimpleKafkaSpecExecutorInstanceConsumer(Config config, Logger log) { - this(config, Optional.of(log)); - } - - /** Constructor with no logging */ - public SimpleKafkaSpecExecutorInstanceConsumer(Config config) { - this(config, Optional.<Logger>absent()); - } - - @Override - public Future<? extends List<Pair<Verb, Spec>>> changedSpecs() { - List<Pair<Verb, Spec>> changesSpecs = new ArrayList<>(); - initializeWatermarks(); - this.currentPartitionIdx = -1; - while (!allPartitionsFinished()) { - if (currentPartitionFinished()) { - moveToNextPartition(); - continue; - } - if (this.messageIterator == null || !this.messageIterator.hasNext()) { - try { - this.messageIterator = fetchNextMessageBuffer(); - } catch (Exception e) { - _log.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.", - getCurrentPartition()), e); - moveToNextPartition(); - continue; - } - if (this.messageIterator == null || !this.messageIterator.hasNext()) { - moveToNextPartition(); - continue; - } - } - while (!currentPartitionFinished()) { - if (!this.messageIterator.hasNext()) { - break; - } - - KafkaConsumerRecord nextValidMessage = this.messageIterator.next(); - - // Even though we ask Kafka to give us a message buffer starting from offset x, it may - // return a buffer that starts from offset smaller than x, so we need to skip messages - // until we get to x. - if (nextValidMessage.getOffset() < _nextWatermark.get(this.currentPartitionIdx)) { - continue; - } - - _nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset()); - try { - final AvroJobSpec record; - - if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) { - record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage); - } else if (nextValidMessage instanceof DecodeableKafkaRecord){ - record = ((DecodeableKafkaRecord<?, AvroJobSpec>) nextValidMessage).getValue(); - } else { - throw new IllegalStateException( - "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord" - + " or DecodeableKafkaRecord"); - } - - JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri()); - - Properties props = new Properties(); - props.putAll(record.getProperties()); - jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion()) - .withDescription(record.getDescription()).withConfigAsProperties(props); - - if (!record.getTemplateUri().isEmpty()) { - jobSpecBuilder.withTemplate(new URI(record.getTemplateUri())); - } - - String verbName = record.getMetadata().get(VERB_KEY); - Verb verb = Verb.valueOf(verbName); - - changesSpecs.add(new ImmutablePair<Verb, Spec>(verb, jobSpecBuilder.build())); - } catch (Throwable t) { - _log.error("Could not decode record at partition " + this.currentPartitionIdx + - " offset " + nextValidMessage.getOffset()); - } - } - } - - return new CompletedFuture(changesSpecs, null); - } - - private void initializeWatermarks() { - initializeLowWatermarks(); - initializeHighWatermarks(); - } - - private void initializeLowWatermarks() { - try { - int i=0; - for (KafkaPartition kafkaPartition : _partitions) { - if (isFirstRun) { - long earliestOffset = _kafka08Consumer.getEarliestOffset(kafkaPartition); - _lowWatermark.set(i, earliestOffset); - } else { - _lowWatermark.set(i, _highWatermark.get(i)); - } - i++; - } - isFirstRun = false; - } catch (KafkaOffsetRetrievalFailureException e) { - throw new RuntimeException(e); - } - } - - private void initializeHighWatermarks() { - try { - int i=0; - for (KafkaPartition kafkaPartition : _partitions) { - long latestOffset = _kafka08Consumer.getLatestOffset(kafkaPartition); - _highWatermark.set(i, latestOffset); - i++; - } - } catch (KafkaOffsetRetrievalFailureException e) { - throw new RuntimeException(e); - } - } - - private boolean allPartitionsFinished() { - return this.currentPartitionIdx >= _nextWatermark.size(); - } - - private boolean currentPartitionFinished() { - if (this.currentPartitionIdx == -1) { - return true; - } else if (_nextWatermark.get(this.currentPartitionIdx) >= _highWatermark.get(this.currentPartitionIdx)) { - return true; - } else { - return false; - } - } - - private int moveToNextPartition() { - this.messageIterator = null; - return this.currentPartitionIdx ++; - } - - private KafkaPartition getCurrentPartition() { - return _partitions.get(this.currentPartitionIdx); - } - - private Iterator<KafkaConsumerRecord> fetchNextMessageBuffer() { - return _kafka08Consumer.consume(_partitions.get(this.currentPartitionIdx), - _nextWatermark.get(this.currentPartitionIdx), _highWatermark.get(this.currentPartitionIdx)); - } - - private AvroJobSpec decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException { - InputStream is = new ByteArrayInputStream(kafkaConsumerRecord.getMessageBytes()); - _versionWriter.readSchemaVersioningInformation(new DataInputStream(is)); - - Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder); - - return _reader.read(null, decoder); - } - - @Override - public void close() throws IOException { - _kafka08Consumer.close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceProducer.java deleted file mode 100644 index cdafe06..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceProducer.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service; - -import java.io.Closeable; -import java.io.IOException; -import java.net.URI; -import java.util.List; -import java.util.concurrent.Future; - -import javax.annotation.concurrent.NotThreadSafe; - -import org.apache.avro.mapred.AvroJob; -import org.slf4j.Logger; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; - -import org.apache.gobblin.kafka.writer.Kafka08DataWriter; -import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer; -import org.apache.gobblin.metrics.reporter.util.AvroSerializer; -import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; -import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer; -import org.apache.gobblin.runtime.job_spec.AvroJobSpec; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.writer.WriteCallback; - - -@NotThreadSafe -public class SimpleKafkaSpecExecutorInstanceProducer extends SimpleKafkaSpecExecutorInstance - implements SpecExecutorInstanceProducer<Spec>, Closeable { - - // Producer - protected Kafka08DataWriter<byte[]> _kafka08Producer; - private final AvroSerializer<AvroJobSpec> _serializer; - - public SimpleKafkaSpecExecutorInstanceProducer(Config config, Optional<Logger> log) { - super(config, log); - - try { - _serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new FixedSchemaVersionWriter()); - } catch (IOException e) { - throw new RuntimeException("Could not create AvroBinarySerializer", e); - } - } - - public SimpleKafkaSpecExecutorInstanceProducer(Config config, Logger log) { - this(config, Optional.of(log)); - } - - /** Constructor with no logging */ - public SimpleKafkaSpecExecutorInstanceProducer(Config config) { - this(config, Optional.<Logger>absent()); - } - - @Override - public Future<?> addSpec(Spec addedSpec) { - AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, Verb.ADD); - - _log.info("Adding Spec: " + addedSpec + " using Kafka."); - - return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY); - } - - @Override - public Future<?> updateSpec(Spec updatedSpec) { - AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, Verb.UPDATE); - - _log.info("Updating Spec: " + updatedSpec + " using Kafka."); - - return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY); - } - - @Override - public Future<?> deleteSpec(URI deletedSpecURI) { - - AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString()) - .setMetadata(ImmutableMap.of(VERB_KEY, Verb.DELETE.name())).build(); - - _log.info("Deleting Spec: " + deletedSpecURI + " using Kafka."); - - return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY); - } - - @Override - public Future<? extends List<Spec>> listSpecs() { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws IOException { - _kafka08Producer.close(); - } - - private Kafka08DataWriter<byte[]> getKafka08Producer() { - if (null == _kafka08Producer) { - _kafka08Producer = new Kafka08DataWriter<byte[]>(ConfigUtils.configToProperties(_config)); - } - return _kafka08Producer; - } - - private AvroJobSpec convertToAvroJobSpec(Spec spec, Verb verb) { - if (spec instanceof JobSpec) { - JobSpec jobSpec = (JobSpec) spec; - AvroJobSpec.Builder avroJobSpecBuilder = AvroJobSpec.newBuilder(); - - avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion()) - .setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties())) - .setMetadata(ImmutableMap.of(VERB_KEY, verb.name())); - - if (jobSpec.getTemplateURI().isPresent()) { - avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString()); - } - - return avroJobSpecBuilder.build(); - } else { - throw new RuntimeException("Unsupported spec type " + spec.getClass()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java new file mode 100644 index 0000000..13aae6f --- /dev/null +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.concurrent.Future; +import javax.annotation.concurrent.NotThreadSafe; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; + +import org.slf4j.Logger; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.kafka.writer.Kafka08DataWriter; +import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer; +import org.apache.gobblin.metrics.reporter.util.AvroSerializer; +import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.job_spec.AvroJobSpec; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.WriteCallback; +import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@NotThreadSafe +public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable { + + // Producer + protected Kafka08DataWriter<byte[]> _kafka08Producer; + private final AvroSerializer<AvroJobSpec> _serializer; + private Config _config; + + public SimpleKafkaSpecProducer(Config config, Optional<Logger> log) { + + try { + _serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new FixedSchemaVersionWriter()); + _config = config; + } catch (IOException e) { + throw new RuntimeException("Could not create AvroBinarySerializer", e); + } + } + + public SimpleKafkaSpecProducer(Config config, Logger log) { + this(config, Optional.of(log)); + } + + /** Constructor with no logging */ + public SimpleKafkaSpecProducer(Config config) { + this(config, Optional.<Logger>absent()); + } + + @Override + public Future<?> addSpec(Spec addedSpec) { + AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD); + + log.info("Adding Spec: " + addedSpec + " using Kafka."); + + return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY); + } + + @Override + public Future<?> updateSpec(Spec updatedSpec) { + AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE); + + log.info("Updating Spec: " + updatedSpec + " using Kafka."); + + return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY); + } + + @Override + public Future<?> deleteSpec(URI deletedSpecURI) { + + AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString()) + .setMetadata(ImmutableMap.of(VERB_KEY, Verb.DELETE.name())).build(); + + log.info("Deleting Spec: " + deletedSpecURI + " using Kafka."); + + return getKafka08Producer().write(_serializer.serializeRecord(avroJobSpec), WriteCallback.EMPTY); + } + + @Override + public Future<? extends List<Spec>> listSpecs() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + _kafka08Producer.close(); + } + + private Kafka08DataWriter<byte[]> getKafka08Producer() { + if (null == _kafka08Producer) { + _kafka08Producer = new Kafka08DataWriter<byte[]>(ConfigUtils.configToProperties(_config)); + } + return _kafka08Producer; + } + + private AvroJobSpec convertToAvroJobSpec(Spec spec, Verb verb) { + if (spec instanceof JobSpec) { + JobSpec jobSpec = (JobSpec) spec; + AvroJobSpec.Builder avroJobSpecBuilder = AvroJobSpec.newBuilder(); + + avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion()) + .setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties())) + .setMetadata(ImmutableMap.of(VERB_KEY, verb.name())); + + if (jobSpec.getTemplateURI().isPresent()) { + avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString()); + } + + return avroJobSpecBuilder.build(); + } else { + throw new RuntimeException("Unsupported spec type " + spec.getClass()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java new file mode 100644 index 0000000..fd42211 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.google.common.util.concurrent.AbstractIdleService; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.MutableJobCatalog; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecConsumer; +import org.apache.gobblin.runtime.job_monitor.AvroJobSpecKafkaJobMonitor; +import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor; +import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl; +import org.apache.gobblin.util.CompletedFuture; +import org.apache.gobblin.util.ConfigUtils; +import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +/** + * SpecConsumer that consumes from kafka in a streaming manner + * Implemented {@link AbstractIdleService} for starting up and shutting down. + */ +public class StreamingKafkaSpecConsumer extends AbstractIdleService implements SpecConsumer<Spec>, Closeable { + public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize"; + private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100; + private final AvroJobSpecKafkaJobMonitor _jobMonitor; + private final BlockingQueue<ImmutablePair<Verb, Spec>> _jobSpecQueue; + + public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) { + String topic = config.getString(SPEC_KAFKA_TOPICS_KEY); + Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic, + KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_KEY, KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_SMALLEST)); + + try { + _jobMonitor = (AvroJobSpecKafkaJobMonitor)(new AvroJobSpecKafkaJobMonitor.Factory()) + .forConfig(config.withFallback(defaults), jobCatalog); + } catch (IOException e) { + throw new RuntimeException("Could not create job monitor", e); + } + + _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE", + DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE)); + + // listener will add job specs to a blocking queue to send to callers of changedSpecs() + jobCatalog.addListener(new JobSpecListener()); + } + + public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) { + this(config, jobCatalog, Optional.of(log)); + } + + /** Constructor with no logging */ + public StreamingKafkaSpecConsumer(Config config, MutableJobCatalog jobCatalog) { + this(config, jobCatalog, Optional.<Logger>absent()); + } + + /** + * This method returns job specs receive from Kafka. It will block if there are no job specs. + * @return list of (verb, jobspecs) pairs. + */ + @Override + public Future<? extends List<Pair<Verb, Spec>>> changedSpecs() { + List<Pair<Verb, Spec>> changesSpecs = new ArrayList<>(); + + try { + Pair<Verb, Spec> specPair = _jobSpecQueue.take(); + + do { + changesSpecs.add(specPair); + + // if there are more elements then pass them along in this call + specPair = _jobSpecQueue.poll(); + } while (specPair != null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return new CompletedFuture(changesSpecs, null); + } + + @Override + protected void startUp() { + _jobMonitor.startAsync().awaitRunning(); + } + + @Override + protected void shutDown() { + _jobMonitor.stopAsync().awaitTerminated(); + } + + @Override + public void close() throws IOException { + shutDown(); + } + + /** + * JobCatalog listener that puts messages into a blocking queue for consumption by changedSpecs method of + * {@link StreamingKafkaSpecConsumer} + */ + protected class JobSpecListener extends DefaultJobCatalogListenerImpl { + public JobSpecListener() { + super(StreamingKafkaSpecConsumer.this.log); + } + + @Override public void onAddJob(JobSpec addedJob) { + super.onAddJob(addedJob); + + try { + _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.ADD, addedJob)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) { + super.onDeleteJob(deletedJobURI, deletedJobVersion); + try { + JobSpec.Builder jobSpecBuilder = JobSpec.builder(deletedJobURI); + + Properties props = new Properties(); + jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props); + + _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.DELETE, jobSpecBuilder.build())); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override public void onUpdateJob(JobSpec updatedJob) { + super.onUpdateJob(updatedJob); + + try { + _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.UPDATE, updatedJob)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceConsumer.java deleted file mode 100644 index ac7fe03..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceConsumer.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service; - -import org.apache.gobblin.util.ConfigUtils; -import java.io.Closeable; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.MutableJobCatalog; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceConsumer; -import org.apache.gobblin.runtime.job_monitor.AvroJobSpecKafkaJobMonitor; -import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor; -import org.apache.gobblin.runtime.std.DefaultJobCatalogListenerImpl; -import org.apache.gobblin.util.CompletedFuture; - - -/** - * SpecExecutorInstanceConsumer that consumes from kafka in a streaming manner - */ -public class StreamingKafkaSpecExecutorInstanceConsumer extends SimpleKafkaSpecExecutorInstance - implements SpecExecutorInstanceConsumer<Spec>, Closeable { - public static final String SPEC_STREAMING_BLOCKING_QUEUE_SIZE = "spec.StreamingBlockingQueueSize"; - private static final int DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE = 100; - private final AvroJobSpecKafkaJobMonitor _jobMonitor; - private final BlockingQueue<ImmutablePair<Verb, Spec>> _jobSpecQueue; - - public StreamingKafkaSpecExecutorInstanceConsumer(Config config, MutableJobCatalog jobCatalog, Optional<Logger> log) { - super(config, log); - String topic = config.getString(SPEC_KAFKA_TOPICS_KEY); - Config defaults = ConfigFactory.parseMap(ImmutableMap.of(AvroJobSpecKafkaJobMonitor.TOPIC_KEY, topic, - KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_KEY, KafkaJobMonitor.KAFKA_AUTO_OFFSET_RESET_SMALLEST)); - - try { - _jobMonitor = (AvroJobSpecKafkaJobMonitor)(new AvroJobSpecKafkaJobMonitor.Factory()) - .forConfig(config.withFallback(defaults), jobCatalog); - } catch (IOException e) { - throw new RuntimeException("Could not create job monitor", e); - } - - _jobSpecQueue = new LinkedBlockingQueue<>(ConfigUtils.getInt(config, "SPEC_STREAMING_BLOCKING_QUEUE_SIZE", - DEFAULT_SPEC_STREAMING_BLOCKING_QUEUE_SIZE)); - - // listener will add job specs to a blocking queue to send to callers of changedSpecs() - jobCatalog.addListener(new JobSpecListener()); - } - - public StreamingKafkaSpecExecutorInstanceConsumer(Config config, MutableJobCatalog jobCatalog, Logger log) { - this(config, jobCatalog, Optional.of(log)); - } - - /** Constructor with no logging */ - public StreamingKafkaSpecExecutorInstanceConsumer(Config config, MutableJobCatalog jobCatalog) { - this(config, jobCatalog, Optional.<Logger>absent()); - } - - /** - * This method returns job specs receive from Kafka. It will block if there are no job specs. - * @return list of (verb, jobspecs) pairs. - */ - @Override - public Future<? extends List<Pair<Verb, Spec>>> changedSpecs() { - List<Pair<Verb, Spec>> changesSpecs = new ArrayList<>(); - - try { - Pair<Verb, Spec> specPair = _jobSpecQueue.take(); - - do { - changesSpecs.add(specPair); - - // if there are more elements then pass them along in this call - specPair = _jobSpecQueue.poll(); - } while (specPair != null); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - return new CompletedFuture(changesSpecs, null); - } - - @Override - protected void startUp() { - _jobMonitor.startAsync().awaitRunning(); - } - - @Override - protected void shutDown() { - _jobMonitor.stopAsync().awaitTerminated(); - } - - @Override - public void close() throws IOException { - shutDown(); - } - - /** - * JobCatalog listener that puts messages into a blocking queue for consumption by changedSpecs method of - * {@link StreamingKafkaSpecExecutorInstanceConsumer} - */ - protected class JobSpecListener extends DefaultJobCatalogListenerImpl { - public JobSpecListener() { - super(StreamingKafkaSpecExecutorInstanceConsumer.this._log); - } - - @Override public void onAddJob(JobSpec addedJob) { - super.onAddJob(addedJob); - - try { - _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.ADD, addedJob)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - @Override public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) { - super.onDeleteJob(deletedJobURI, deletedJobVersion); - try { - JobSpec.Builder jobSpecBuilder = JobSpec.builder(deletedJobURI); - - Properties props = new Properties(); - jobSpecBuilder.withVersion(deletedJobVersion).withConfigAsProperties(props); - - _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.DELETE, jobSpecBuilder.build())); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - @Override public void onUpdateJob(JobSpec updatedJob) { - super.onUpdateJob(updatedJob); - - try { - _jobSpecQueue.put(new ImmutablePair<Verb, Spec>(Verb.UPDATE, updatedJob)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceTest.java deleted file mode 100644 index 58c5d72..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorInstanceTest.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service; - -import java.net.URI; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import lombok.extern.slf4j.Slf4j; - -import org.apache.commons.lang3.tuple.Pair; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterSuite; -import org.testng.annotations.Test; - -import com.google.common.io.Closer; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys; -import org.apache.gobblin.metrics.reporter.KafkaTestBase; -import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstance; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.writer.WriteResponse; - - -@Slf4j -public class SimpleKafkaSpecExecutorInstanceTest extends KafkaTestBase { - - public static final String TOPIC = SimpleKafkaSpecExecutorInstanceTest.class.getSimpleName(); - - private Closer _closer; - private Properties _properties; - private SimpleKafkaSpecExecutorInstanceProducer _seip; - private SimpleKafkaSpecExecutorInstanceConsumer _seic; - private String _kafkaBrokers; - - public SimpleKafkaSpecExecutorInstanceTest() - throws InterruptedException, RuntimeException { - super(TOPIC); - _kafkaBrokers = "localhost:" + kafkaPort; - log.info("Going to use Kakfa broker: " + _kafkaBrokers); - } - - @Test - public void testAddSpec() throws Exception { - _closer = Closer.create(); - _properties = new Properties(); - - // Properties for Producer - _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC); - _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", _kafkaBrokers); - _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - - // Properties for Consumer - _properties.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers); - _properties.setProperty(SimpleKafkaSpecExecutorInstanceProducer.SPEC_KAFKA_TOPICS_KEY, TOPIC); - - // SEI Producer - _seip = _closer.register(new SimpleKafkaSpecExecutorInstanceProducer(ConfigUtils.propertiesToConfig(_properties))); - - String addedSpecUriString = "/foo/bar/addedSpec"; - Spec spec = initJobSpec(addedSpecUriString); - WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get(); - log.info("WriteResponse: " + writeResponse); - - try { - Thread.sleep(1000); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - _seic = _closer.register(new SimpleKafkaSpecExecutorInstanceConsumer(ConfigUtils.propertiesToConfig(_properties))); - - List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = _seic.changedSpecs().get(); - Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); - - Map.Entry<SpecExecutorInstance.Verb, Spec> consumedSpecAction = consumedEvent.get(0); - Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutorInstance.Verb.ADD), "Verb did not match"); - Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(addedSpecUriString), "Expected URI did not match"); - Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); - } - - @Test (dependsOnMethods = "testAddSpec") - public void testUpdateSpec() throws Exception { - String updatedSpecUriString = "/foo/bar/updatedSpec"; - Spec spec = initJobSpec(updatedSpecUriString); - WriteResponse writeResponse = (WriteResponse) _seip.updateSpec(spec).get(); - log.info("WriteResponse: " + writeResponse); - - try { - Thread.sleep(1000); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = _seic.changedSpecs().get(); - Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); - - Map.Entry<SpecExecutorInstance.Verb, Spec> consumedSpecAction = consumedEvent.get(0); - Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutorInstance.Verb.UPDATE), "Verb did not match"); - Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(updatedSpecUriString), "Expected URI did not match"); - Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); - } - - @Test (dependsOnMethods = "testUpdateSpec") - public void testDeleteSpec() throws Exception { - String deletedSpecUriString = "/foo/bar/deletedSpec"; - WriteResponse writeResponse = (WriteResponse) _seip.deleteSpec(new URI(deletedSpecUriString)).get(); - log.info("WriteResponse: " + writeResponse); - - try { - Thread.sleep(1000); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = _seic.changedSpecs().get(); - Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); - - Map.Entry<SpecExecutorInstance.Verb, Spec> consumedSpecAction = consumedEvent.get(0); - Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutorInstance.Verb.DELETE), "Verb did not match"); - Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(deletedSpecUriString), "Expected URI did not match"); - Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); - } - - @Test (dependsOnMethods = "testDeleteSpec") - public void testResetConsumption() throws Exception { - SimpleKafkaSpecExecutorInstanceConsumer seic = _closer - .register(new SimpleKafkaSpecExecutorInstanceConsumer(ConfigUtils.propertiesToConfig(_properties))); - - List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = seic.changedSpecs().get(); - Assert.assertTrue(consumedEvent.size() == 3, "Consumption was reset, we should see all events"); - } - - private JobSpec initJobSpec(String specUri) { - Properties properties = new Properties(); - return JobSpec.builder(specUri) - .withConfig(ConfigUtils.propertiesToConfig(properties)) - .withVersion("1") - .withDescription("Spec Description") - .build(); - } - - @AfterClass - public void after() { - try { - _closer.close(); - } catch(Exception e) { - log.error("Failed to close SEIC and SEIP.", e); - } - try { - close(); - } catch(Exception e) { - log.error("Failed to close Kafka server.", e); - } - } - - @AfterSuite - public void afterSuite() { - closeServer(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorTest.java new file mode 100644 index 0000000..5567c18 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/SimpleKafkaSpecExecutorTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import com.google.common.io.Closer; + +import org.apache.commons.lang3.tuple.Pair; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.Test; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys; +import org.apache.gobblin.metrics.reporter.KafkaTestBase; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.WriteResponse; +import org.apache.gobblin.runtime.api.SpecExecutor; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SimpleKafkaSpecExecutorTest extends KafkaTestBase { + + public static final String TOPIC = SimpleKafkaSpecExecutorTest.class.getSimpleName(); + + private Closer _closer; + private Properties _properties; + private SimpleKafkaSpecProducer _seip; + private SimpleKafkaSpecConsumer _seic; + private String _kafkaBrokers; + + public SimpleKafkaSpecExecutorTest() + throws InterruptedException, RuntimeException { + super(TOPIC); + _kafkaBrokers = "localhost:" + kafkaPort; + log.info("Going to use Kakfa broker: " + _kafkaBrokers); + } + + @Test + public void testAddSpec() throws Exception { + _closer = Closer.create(); + _properties = new Properties(); + + // Properties for Producer + _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC); + _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", _kafkaBrokers); + _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + + // Properties for Consumer + _properties.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers); + _properties.setProperty(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY, TOPIC); + + // SEI Producer + _seip = _closer.register(new SimpleKafkaSpecProducer(ConfigUtils.propertiesToConfig(_properties))); + + String addedSpecUriString = "/foo/bar/addedSpec"; + Spec spec = initJobSpec(addedSpecUriString); + WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get(); + log.info("WriteResponse: " + writeResponse); + + try { + Thread.sleep(1000); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + _seic = _closer.register(new SimpleKafkaSpecConsumer(ConfigUtils.propertiesToConfig(_properties))); + + List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get(); + Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); + + Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0); + Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(addedSpecUriString), "Expected URI did not match"); + Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); + } + + @Test (dependsOnMethods = "testAddSpec") + public void testUpdateSpec() throws Exception { + String updatedSpecUriString = "/foo/bar/updatedSpec"; + Spec spec = initJobSpec(updatedSpecUriString); + WriteResponse writeResponse = (WriteResponse) _seip.updateSpec(spec).get(); + log.info("WriteResponse: " + writeResponse); + + try { + Thread.sleep(1000); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get(); + Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); + + Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0); + Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.UPDATE), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(updatedSpecUriString), "Expected URI did not match"); + Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); + } + + @Test (dependsOnMethods = "testUpdateSpec") + public void testDeleteSpec() throws Exception { + String deletedSpecUriString = "/foo/bar/deletedSpec"; + WriteResponse writeResponse = (WriteResponse) _seip.deleteSpec(new URI(deletedSpecUriString)).get(); + log.info("WriteResponse: " + writeResponse); + + try { + Thread.sleep(1000); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get(); + Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); + + Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0); + Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(deletedSpecUriString), "Expected URI did not match"); + Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); + } + + @Test (dependsOnMethods = "testDeleteSpec") + public void testResetConsumption() throws Exception { + SimpleKafkaSpecConsumer seic = _closer + .register(new SimpleKafkaSpecConsumer(ConfigUtils.propertiesToConfig(_properties))); + + List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = seic.changedSpecs().get(); + Assert.assertTrue(consumedEvent.size() == 3, "Consumption was reset, we should see all events"); + } + + private JobSpec initJobSpec(String specUri) { + Properties properties = new Properties(); + return JobSpec.builder(specUri) + .withConfig(ConfigUtils.propertiesToConfig(properties)) + .withVersion("1") + .withDescription("Spec Description") + .build(); + } + + @AfterClass + public void after() { + try { + _closer.close(); + } catch(Exception e) { + log.error("Failed to close SEIC and SEIP.", e); + } + try { + close(); + } catch(Exception e) { + log.error("Failed to close Kafka server.", e); + } + } + + @AfterSuite + public void afterSuite() { + closeServer(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceTest.java deleted file mode 100644 index 939aafa..0000000 --- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorInstanceTest.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterSuite; -import org.testng.annotations.Test; - -import com.google.common.io.Closer; -import com.typesafe.config.Config; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys; -import org.apache.gobblin.metrics.reporter.KafkaTestBase; -import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstance; -import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.writer.WriteResponse; -import lombok.extern.slf4j.Slf4j; - - -@Slf4j -public class StreamingKafkaSpecExecutorInstanceTest extends KafkaTestBase { - - public static final String TOPIC = StreamingKafkaSpecExecutorInstanceTest.class.getSimpleName(); - - private Closer _closer; - private Properties _properties; - private SimpleKafkaSpecExecutorInstanceProducer _seip; - private StreamingKafkaSpecExecutorInstanceConsumer _seic; - private NonObservingFSJobCatalog _jobCatalog; - private String _kafkaBrokers; - private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorInstanceTest"; - private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs"; - - public StreamingKafkaSpecExecutorInstanceTest() - throws InterruptedException, RuntimeException { - super(TOPIC); - _kafkaBrokers = "localhost:" + kafkaPort; - log.info("Going to use Kakfa broker: " + _kafkaBrokers); - - cleanupTestDir(); - } - - private void cleanupTestDir() { - File testDir = new File(_TEST_DIR_PATH); - - if (testDir.exists()) { - try { - FileUtils.deleteDirectory(testDir); - } catch (IOException e) { - throw new RuntimeException("Could not delete test directory", e); - } - } - } - - @Test - public void testAddSpec() throws Exception { - _closer = Closer.create(); - _properties = new Properties(); - - // Properties for Producer - _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC); - _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", _kafkaBrokers); - _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - - // Properties for Consumer - _properties.setProperty("jobSpecMonitor.kafka.zookeeper.connect", zkConnect); - _properties.setProperty(SimpleKafkaSpecExecutorInstanceProducer.SPEC_KAFKA_TOPICS_KEY, TOPIC); - _properties.setProperty("gobblin.cluster.jobconf.fullyQualifiedPath", _JOBS_DIR_PATH); - - Config config = ConfigUtils.propertiesToConfig(_properties); - - // SEI Producer - _seip = _closer.register(new SimpleKafkaSpecExecutorInstanceProducer(config)); - - String addedSpecUriString = "/foo/bar/addedSpec"; - Spec spec = initJobSpec(addedSpecUriString); - WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get(); - log.info("WriteResponse: " + writeResponse); - - _jobCatalog = new NonObservingFSJobCatalog(config.getConfig("gobblin.cluster")); - _jobCatalog.startAsync().awaitRunning(); - - // SEI Consumer - _seic = _closer.register(new StreamingKafkaSpecExecutorInstanceConsumer(config, _jobCatalog)); - _seic.startAsync().awaitRunning(); - - List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = _seic.changedSpecs().get(); - Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); - - Map.Entry<SpecExecutorInstance.Verb, Spec> consumedSpecAction = consumedEvent.get(0); - Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutorInstance.Verb.ADD), "Verb did not match"); - Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(addedSpecUriString), "Expected URI did not match"); - Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); - } - - @Test (dependsOnMethods = "testAddSpec") - public void testUpdateSpec() throws Exception { - // update is only treated as an update for existing job specs - String updatedSpecUriString = "/foo/bar/addedSpec"; - Spec spec = initJobSpec(updatedSpecUriString); - WriteResponse writeResponse = (WriteResponse) _seip.updateSpec(spec).get(); - log.info("WriteResponse: " + writeResponse); - - List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = _seic.changedSpecs().get(); - Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); - - Map.Entry<SpecExecutorInstance.Verb, Spec> consumedSpecAction = consumedEvent.get(0); - Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutorInstance.Verb.UPDATE), "Verb did not match"); - Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(updatedSpecUriString), "Expected URI did not match"); - Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); - } - - @Test (dependsOnMethods = "testUpdateSpec") - public void testDeleteSpec() throws Exception { - // delete needs to be on a job spec that exists to get notification - String deletedSpecUriString = "/foo/bar/addedSpec"; - WriteResponse writeResponse = (WriteResponse) _seip.deleteSpec(new URI(deletedSpecUriString)).get(); - log.info("WriteResponse: " + writeResponse); - - List<Pair<SpecExecutorInstance.Verb, Spec>> consumedEvent = _seic.changedSpecs().get(); - Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); - - Map.Entry<SpecExecutorInstance.Verb, Spec> consumedSpecAction = consumedEvent.get(0); - Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutorInstance.Verb.DELETE), "Verb did not match"); - Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(deletedSpecUriString), "Expected URI did not match"); - Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); - } - - private JobSpec initJobSpec(String specUri) { - Properties properties = new Properties(); - return JobSpec.builder(specUri) - .withConfig(ConfigUtils.propertiesToConfig(properties)) - .withVersion("1") - .withDescription("Spec Description") - .build(); - } - - @AfterClass - public void after() { - try { - _closer.close(); - } catch(Exception e) { - log.error("Failed to close SEIC and SEIP.", e); - } - try { - close(); - } catch(Exception e) { - log.error("Failed to close Kafka server.", e); - } - - if (_jobCatalog != null) { - _jobCatalog.stopAsync().awaitTerminated(); - } - - cleanupTestDir(); - } - - @AfterSuite - public void afterSuite() { - closeServer(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java new file mode 100644 index 0000000..e9c7ee6 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.Test; + +import com.google.common.io.Closer; +import com.typesafe.config.Config; + +import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys; +import org.apache.gobblin.metrics.reporter.KafkaTestBase; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.WriteResponse; +import org.apache.gobblin.runtime.api.SpecExecutor; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class StreamingKafkaSpecExecutorTest extends KafkaTestBase { + + public static final String TOPIC = StreamingKafkaSpecExecutorTest.class.getSimpleName(); + + private Closer _closer; + private Properties _properties; + private SimpleKafkaSpecProducer _seip; + private StreamingKafkaSpecConsumer _seic; + private NonObservingFSJobCatalog _jobCatalog; + private String _kafkaBrokers; + private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest"; + private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs"; + + public StreamingKafkaSpecExecutorTest() + throws InterruptedException, RuntimeException { + super(TOPIC); + _kafkaBrokers = "localhost:" + kafkaPort; + log.info("Going to use Kakfa broker: " + _kafkaBrokers); + + cleanupTestDir(); + } + + private void cleanupTestDir() { + File testDir = new File(_TEST_DIR_PATH); + + if (testDir.exists()) { + try { + FileUtils.deleteDirectory(testDir); + } catch (IOException e) { + throw new RuntimeException("Could not delete test directory", e); + } + } + } + + @Test + public void testAddSpec() throws Exception { + _closer = Closer.create(); + _properties = new Properties(); + + // Properties for Producer + _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC); + _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", _kafkaBrokers); + _properties.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX+"value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + + // Properties for Consumer + _properties.setProperty("jobSpecMonitor.kafka.zookeeper.connect", zkConnect); + _properties.setProperty(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY, TOPIC); + _properties.setProperty("gobblin.cluster.jobconf.fullyQualifiedPath", _JOBS_DIR_PATH); + + Config config = ConfigUtils.propertiesToConfig(_properties); + + // SEI Producer + _seip = _closer.register(new SimpleKafkaSpecProducer(config)); + + String addedSpecUriString = "/foo/bar/addedSpec"; + Spec spec = initJobSpec(addedSpecUriString); + WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get(); + log.info("WriteResponse: " + writeResponse); + + _jobCatalog = new NonObservingFSJobCatalog(config.getConfig("gobblin.cluster")); + _jobCatalog.startAsync().awaitRunning(); + + // SEI Consumer + _seic = _closer.register(new StreamingKafkaSpecConsumer(config, _jobCatalog)); + _seic.startAsync().awaitRunning(); + + List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get(); + Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); + + Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0); + Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(addedSpecUriString), "Expected URI did not match"); + Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); + } + + @Test (dependsOnMethods = "testAddSpec") + public void testUpdateSpec() throws Exception { + // update is only treated as an update for existing job specs + String updatedSpecUriString = "/foo/bar/addedSpec"; + Spec spec = initJobSpec(updatedSpecUriString); + WriteResponse writeResponse = (WriteResponse) _seip.updateSpec(spec).get(); + log.info("WriteResponse: " + writeResponse); + + List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get(); + Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); + + Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0); + Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.UPDATE), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(updatedSpecUriString), "Expected URI did not match"); + Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); + } + + @Test (dependsOnMethods = "testUpdateSpec") + public void testDeleteSpec() throws Exception { + // delete needs to be on a job spec that exists to get notification + String deletedSpecUriString = "/foo/bar/addedSpec"; + WriteResponse writeResponse = (WriteResponse) _seip.deleteSpec(new URI(deletedSpecUriString)).get(); + log.info("WriteResponse: " + writeResponse); + + List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get(); + Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production"); + + Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0); + Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match"); + Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(deletedSpecUriString), "Expected URI did not match"); + Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec"); + } + + private JobSpec initJobSpec(String specUri) { + Properties properties = new Properties(); + return JobSpec.builder(specUri) + .withConfig(ConfigUtils.propertiesToConfig(properties)) + .withVersion("1") + .withDescription("Spec Description") + .build(); + } + + @AfterClass + public void after() { + try { + _closer.close(); + } catch(Exception e) { + log.error("Failed to close SEIC and SEIP.", e); + } + try { + close(); + } catch(Exception e) { + log.error("Failed to close Kafka server.", e); + } + + if (_jobCatalog != null) { + _jobCatalog.stopAsync().awaitTerminated(); + } + + cleanupTestDir(); + } + + @AfterSuite + public void afterSuite() { + closeServer(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java index bb617a7..aceb5dd 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java @@ -24,16 +24,16 @@ import org.apache.gobblin.instrumented.Instrumentable; /*** * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s - * and the mapping to {@link SpecExecutorInstance} that they can be run on. + * and the mapping to {@link SpecExecutor} that they can be run on. */ public interface SpecCompiler extends SpecCatalogListener, Instrumentable { /*** * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s - * and the mapping to {@link SpecExecutorInstance} that they can be run on. + * and the mapping to {@link SpecExecutor} that they can be run on. * @param spec {@link Spec} to compile. - * @return Map of materialized physical {@link Spec} and {@link SpecExecutorInstance}. + * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}. */ - Map<Spec, SpecExecutorInstanceProducer> compileFlow(Spec spec); + Map<Spec, SpecExecutor> compileFlow(Spec spec); /*** * Map of {@link Spec} URI and {@link TopologySpec} the {@link SpecCompiler} @@ -41,4 +41,4 @@ public interface SpecCompiler extends SpecCatalogListener, Instrumentable { * @return Map of {@link Spec} URI and {@link TopologySpec} */ Map<URI, TopologySpec> getTopologySpecMap(); -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TopologySpec.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TopologySpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TopologySpec.java index da89b9e..c33f3de 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TopologySpec.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TopologySpec.java @@ -25,26 +25,25 @@ import javax.annotation.concurrent.NotThreadSafe; import edu.umd.cs.findbugs.annotations.SuppressWarnings; -import lombok.AllArgsConstructor; -import lombok.Data; - -import org.apache.commons.lang3.reflect.ConstructorUtils; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.commons.lang3.reflect.ConstructorUtils; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutorInstanceProducer; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; + +import lombok.AllArgsConstructor; +import lombok.Data; /** - * Data model representation that describes a topology ie. a {@link SpecExecutorInstance} and its + * Data model representation that describes a topology ie. a {@link SpecExecutor} and its * capabilities tuple . * */ @@ -53,8 +52,8 @@ import org.apache.gobblin.util.ConfigUtils; @AllArgsConstructor @NotThreadSafe public class TopologySpec implements Configurable, Spec { - public static final String DEFAULT_SPEC_EXECUTOR_INSTANCE_PRODUCER = InMemorySpecExecutorInstanceProducer.class.getCanonicalName(); - public static final String SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY = "specExecutorInstanceProducer.class"; + public static final String DEFAULT_SPEC_EXECUTOR_INSTANCE = InMemorySpecExecutor.class.getCanonicalName(); + public static final String SPEC_EXECUTOR_INSTANCE_KEY = "specExecutorInstance.class"; private static final long serialVersionUID = 6106269076155338046L; @@ -78,26 +77,29 @@ public class TopologySpec implements Configurable, Spec { /** Underlying executor instance such as Gobblin cluster or Azkaban */ @SuppressWarnings(justification="Initialization handled by getter", value="SE_TRANSIENT_FIELD_NOT_RESTORED") - transient SpecExecutorInstanceProducer specExecutorInstanceProducer; + transient SpecExecutor specExecutorInstance; - public SpecExecutorInstanceProducer getSpecExecutorInstanceProducer() { - if (null == specExecutorInstanceProducer) { - String specExecutorInstanceProducerClass = DEFAULT_SPEC_EXECUTOR_INSTANCE_PRODUCER; - if (config.hasPath(SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY)) { - specExecutorInstanceProducerClass = config.getString(SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY); + /** + * @return A {@link SpecExecutor}'s instance defined by <Technology, Location, Communication Mechanism> + */ + public synchronized SpecExecutor getSpecExecutor() { + if (null == specExecutorInstance) { + String specExecutorClass = DEFAULT_SPEC_EXECUTOR_INSTANCE; + if (config.hasPath(SPEC_EXECUTOR_INSTANCE_KEY)) { + specExecutorClass = config.getString(SPEC_EXECUTOR_INSTANCE_KEY); } try { - ClassAliasResolver<SpecExecutorInstanceProducer> _aliasResolver = - new ClassAliasResolver<>(SpecExecutorInstanceProducer.class); - specExecutorInstanceProducer = (SpecExecutorInstanceProducer) ConstructorUtils + ClassAliasResolver<SpecExecutor> _aliasResolver = + new ClassAliasResolver<>(SpecExecutor.class); + specExecutorInstance = (SpecExecutor) ConstructorUtils .invokeConstructor(Class.forName(_aliasResolver - .resolve(specExecutorInstanceProducerClass)), config); + .resolve(specExecutorClass)), config); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) { throw new RuntimeException(e); } } - return specExecutorInstanceProducer; + return specExecutorInstance; } public static TopologySpec.Builder builder(URI topologySpecUri) { @@ -164,7 +166,7 @@ public class TopologySpec implements Configurable, Spec { private String version = "1"; private Optional<String> description = Optional.absent(); private Optional<URI> topologyCatalogURI = Optional.absent(); - private Optional<SpecExecutorInstanceProducer> specExecutorInstanceProducer = Optional.absent(); + private Optional<SpecExecutor> specExecutorInstance = Optional.absent(); public Builder(URI topologySpecUri) { Preconditions.checkNotNull(topologySpecUri); @@ -189,9 +191,9 @@ public class TopologySpec implements Configurable, Spec { public TopologySpec build() { Preconditions.checkNotNull(this.uri); Preconditions.checkNotNull(this.version); + return new TopologySpec(getURI(), getVersion(), getDescription(), getConfig(), getConfigAsProperties(), + getSpecExceutorInstance()); - return new TopologySpec(getURI(), getVersion(), getDescription(), getConfig(), - getConfigAsProperties(), getSpecExceutorInstanceProducer()); } /** The scheme and authority of the topology catalog URI are used to generate TopologySpec URIs from @@ -314,17 +316,17 @@ public class TopologySpec implements Configurable, Spec { return this; } - public SpecExecutorInstanceProducer getSpecExceutorInstanceProducer() { - if (!this.specExecutorInstanceProducer.isPresent()) { - // TODO: Try to init SpecExecutorInstanceProducer from config if not initialized via builder. - throw new RuntimeException("SpecExecutorInstanceProducer not initialized."); + public SpecExecutor getSpecExceutorInstance() { + if (!this.specExecutorInstance.isPresent()) { + // TODO: Try to init SpecProducer from config if not initialized via builder. + throw new RuntimeException("SpecExecutor not initialized."); } - return this.specExecutorInstanceProducer.get(); + return this.specExecutorInstance.get(); } - public TopologySpec.Builder withSpecExecutorInstanceProducer(SpecExecutorInstanceProducer specExecutorInstanceProducer) { - Preconditions.checkNotNull(specExecutorInstanceProducer); - this.specExecutorInstanceProducer = Optional.of(specExecutorInstanceProducer); + public TopologySpec.Builder withSpecExecutor(SpecExecutor specExecutor) { + Preconditions.checkNotNull(specExecutor); + this.specExecutorInstance = Optional.of(specExecutor); return this; } } @@ -337,4 +339,4 @@ public class TopologySpec implements Configurable, Spec { return this.uri; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java index 1f2ce21..59733d3 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java @@ -21,41 +21,25 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; -import java.util.Map; import java.util.Properties; -import java.util.regex.Pattern; -import org.apache.hadoop.fs.Path; - -import com.codahale.metrics.Counter; -import com.google.common.base.Charsets; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigValue; -import org.apache.gobblin.metrics.GobblinTrackingEvent; -import org.apache.gobblin.metrics.event.sla.SlaEventKeys; import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; -import org.apache.gobblin.metrics.reporter.util.NoopSchemaVersionWriter; import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; import org.apache.gobblin.runtime.api.GobblinInstanceDriver; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.JobSpecMonitor; import org.apache.gobblin.runtime.api.JobSpecMonitorFactory; import org.apache.gobblin.runtime.api.MutableJobCatalog; -import org.apache.gobblin.runtime.api.SpecExecutorInstance; -import org.apache.gobblin.runtime.api.SpecExecutorInstance.Verb; +import org.apache.gobblin.runtime.api.SpecExecutor.Verb; import org.apache.gobblin.runtime.job_spec.AvroJobSpec; -import org.apache.gobblin.runtime.metrics.RuntimeMetrics; import org.apache.gobblin.util.Either; -import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; -import kafka.message.MessageAndMetadata; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -153,4 +137,4 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec> return Lists.newArrayList(Either.<JobSpec, URI>right(jobSpec.getUri())); } } -} +} \ No newline at end of file
