Added Kafka Avro serializer with unit tests. Added Kafka task with unit test
Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/68c341bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/68c341bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/68c341bd Branch: refs/heads/master Commit: 68c341bddee77a8d9f873b18a4661482dc4b190f Parents: 2a4bec9 Author: Jakub Jankowski <[email protected]> Authored: Fri Apr 28 17:08:29 2017 +0200 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .../samoa/streams/kafka/KafkaAvroMapper.java | 158 +++++++++++++++++++ .../apache/samoa/streams/kafka/KafkaTask.java | 148 +++++++++++++++++ .../kafka/topology/SimpleComponentFactory.java | 53 +++++++ .../streams/kafka/topology/SimpleEngine.java | 37 +++++ .../topology/SimpleEntranceProcessingItem.java | 33 ++++ .../kafka/topology/SimpleProcessingItem.java | 87 ++++++++++ .../streams/kafka/topology/SimpleStream.java | 95 +++++++++++ .../streams/kafka/topology/SimpleTopology.java | 46 ++++++ samoa-api/src/main/resources/kafka.avsc | 61 +++++++ .../kafka/KafkaEntranceProcessorTest.java | 74 +++++++-- .../samoa/streams/kafka/KafkaTaskTest.java | 138 ++++++++++++++++ .../samoa/streams/kafka/KafkaUtilsTest.java | 24 +-- .../samoa/streams/kafka/TestUtilsForKafka.java | 25 ++- 13 files changed, 952 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java new file mode 100644 index 0000000..91902d0 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java @@ -0,0 +1,158 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; +import org.apache.samoa.learners.InstanceContentEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Sample class for serializing and deserializing {@link InstanceContentEvent} + * from/to Avro format + * + * @author Jakub Jankowski + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + */ +public class KafkaAvroMapper implements KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> { + + private static Logger logger = LoggerFactory.getLogger(KafkaAvroMapper.class); + + @Override + public byte[] serialize(InstanceContentEvent message) { + return toBytesGeneric(InstanceContentEvent.class, message); + } + + @Override + public InstanceContentEvent deserialize(byte[] message) { + return avroDeserialize(message, InstanceContentEvent.class, null); + } + + public static <T> byte[] avroSerialize(Class<T> clazz, Object object) { + byte[] ret = null; + try { + if (object == null || !(object instanceof SpecificRecord)) { + return null; + } + + T record = (T) object; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Encoder e = EncoderFactory.get().directBinaryEncoder(out, null); + SpecificDatumWriter<T> w = new SpecificDatumWriter<T>(clazz); + w.write(record, e); + e.flush(); + ret = out.toByteArray(); + } catch (IOException e) { + + } + + return ret; + } + + public static <T> T avroDeserialize(byte[] avroBytes, Class<T> clazz, Schema schema) { + T ret = null; + try { + ByteArrayInputStream in = new ByteArrayInputStream(avroBytes); + Decoder d = DecoderFactory.get().directBinaryDecoder(in, null); + SpecificDatumReader<T> reader = new SpecificDatumReader<T>(clazz); + ret = reader.read(null, d); + } catch (IOException e) { + + } + + return ret; + } + + public static <V> byte[] toBytesGeneric(final Class<V> cls, final V v) { + final ByteArrayOutputStream bout = new ByteArrayOutputStream(); + final Schema schema = ReflectData.AllowNull.get().getSchema(cls); + final DatumWriter<V> writer = new ReflectDatumWriter<V>(schema); + final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(bout, null); + try { + writer.write(v, binEncoder); + binEncoder.flush(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + + return bout.toByteArray(); + } + + public static <V> byte[] avroBurrSerialize(final Class<V> cls, final V v) { + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + try { + Schema schema = new Schema.Parser().parse(new File("C:/java/avro/kafka.avsc")); + DatumWriter<V> writer; + + if (v instanceof SpecificRecord) { + writer = new SpecificDatumWriter<>(schema); + } else { + writer = new ReflectDatumWriter<>(schema); + } + + BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(bout, null); + writer.write(v, binEncoder); + binEncoder.flush(); + + } catch (IOException e) { + e.printStackTrace(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + + return bout.toByteArray(); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java new file mode 100644 index 0000000..26012f2 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java @@ -0,0 +1,148 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; + +import org.apache.samoa.tasks.Task; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; +import org.apache.samoa.topology.TopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.Configurable; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; + +/** + * Kafka task + * + * @author Jakub Jankowski + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + * + */ + +public class KafkaTask implements Task, Configurable { + + private static final long serialVersionUID = 3984474041982397855L; + private static Logger logger = LoggerFactory.getLogger(KafkaTask.class); + + //czy identyczne dla enterance i destination? + Properties producerProps; + Properties consumerProps; + int timeout; + private final KafkaDeserializer deserializer; + private final KafkaSerializer serializer; + private final String topic; + + private TopologyBuilder builder; + private Topology kafkaTopology; + + public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 'p', + "Number of destination Processors", 1, 1, Integer.MAX_VALUE); + + public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", + "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + + /** + * Class constructor + * @param props Properties of Kafka Producer and Consumer + * @see <a href="http://kafka.apache.org/documentation/#producerconfigs">Kafka Producer configuration</a> + * @see <a href="http://kafka.apache.org/documentation/#consumerconfigs">Kafka Consumer configuration</a> + * @param topic Topic to which destination processor will write into + * @param timeout Timeout used when polling Kafka for new messages + * @param serializer Implementation of KafkaSerializer that handles arriving data serialization + * @param serializer Implementation of KafkaDeserializer that handles arriving data deserialization + */ + public KafkaTask(Properties producerProps, Properties consumerProps, String topic, int timeout, KafkaSerializer serializer, KafkaDeserializer deserializer) { + this.producerProps = producerProps; + this.consumerProps = consumerProps; + this.deserializer = deserializer; + this.serializer = serializer; + this.topic = topic; + this.timeout = timeout; + } + + @Override + public void init() { + logger.info("Invoking init"); + if (builder == null) { + builder = new TopologyBuilder(); + logger.info("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + } + + // create enterance processor + KafkaEntranceProcessor sourceProcessor = new KafkaEntranceProcessor(consumerProps, topic, timeout, deserializer); + builder.addEntranceProcessor(sourceProcessor); + + // create stream + Stream stream = builder.createStream(sourceProcessor); + + // create destination processor + KafkaDestinationProcessor destProcessor = new KafkaDestinationProcessor(producerProps, topic, serializer); + builder.addProcessor(destProcessor, kafkaParallelismOption.getValue()); + builder.connectInputShuffleStream(stream, destProcessor); + + // build topology + kafkaTopology = builder.build(); + logger.info("Successfully built the topology"); + } + + @Override + public Topology getTopology() { + return kafkaTopology; + } + + @Override + public void setFactory(ComponentFactory factory) { + logger.info("Invoking setFactory: "+factory.toString()); + builder = new TopologyBuilder(factory); + logger.info("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java new file mode 100644 index 0000000..155ce1f --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java @@ -0,0 +1,53 @@ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; + +public class SimpleComponentFactory implements ComponentFactory { + + public ProcessingItem createPi(Processor processor, int paralellism) { + return new SimpleProcessingItem(processor, paralellism); + } + + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } + + public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { + return new SimpleEntranceProcessingItem(processor); + } + + public Stream createStream(IProcessingItem sourcePi) { + return new SimpleStream(sourcePi); + } + + public Topology createTopology(String topoName) { + return new SimpleTopology(topoName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java new file mode 100644 index 0000000..d446018 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java @@ -0,0 +1,37 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.Topology; + +public class SimpleEngine { + + public static void submitTopology(Topology topology) { + SimpleTopology simpleTopology = (SimpleTopology) topology; + simpleTopology.run(); + // runs until completion + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java new file mode 100644 index 0000000..4c626dc --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java @@ -0,0 +1,33 @@ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.LocalEntranceProcessingItem; + +class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem { + public SimpleEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + // The default waiting time when there is no available events is 100ms + // Override waitForNewEvents() to change it +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java new file mode 100644 index 0000000..3549b85 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java @@ -0,0 +1,87 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.AbstractProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.utils.PartitioningScheme; +import org.apache.samoa.utils.StreamDestination; + +/** + * + * @author abifet + */ +class SimpleProcessingItem extends AbstractProcessingItem { + private IProcessingItem[] arrayProcessingItem; + + SimpleProcessingItem(Processor processor) { + super(processor); + } + + SimpleProcessingItem(Processor processor, int parallelism) { + super(processor); + this.setParallelism(parallelism); + } + + public IProcessingItem getProcessingItem(int i) { + return arrayProcessingItem[i]; + } + + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); + ((SimpleStream) inputStream).addDestination(destination); + return this; + } + + public SimpleProcessingItem copy() { + Processor processor = this.getProcessor(); + return new SimpleProcessingItem(processor.newProcessor(processor)); + } + + public void processEvent(ContentEvent event, int counter) { + + int parallelism = this.getParallelism(); + // System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism); + if (this.arrayProcessingItem == null && parallelism > 0) { + // Init processing elements, the first time they are needed + this.arrayProcessingItem = new IProcessingItem[parallelism]; + for (int j = 0; j < parallelism; j++) { + arrayProcessingItem[j] = this.copy(); + arrayProcessingItem[j].getProcessor().onCreate(j); + } + } + if (this.arrayProcessingItem != null) { + IProcessingItem pi = this.getProcessingItem(counter); + Processor p = pi.getProcessor(); + // System.out.println("PI="+pi+", p="+p); + this.getProcessingItem(counter).getProcessor().process(event); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java new file mode 100644 index 0000000..269e0cc --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java @@ -0,0 +1,95 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.AbstractStream; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.utils.StreamDestination; + +/** + * + * @author abifet + */ +class SimpleStream extends AbstractStream { + private List<StreamDestination> destinations; + private int maxCounter; + private int eventCounter; + + SimpleStream(IProcessingItem sourcePi) { + super(sourcePi); + this.destinations = new LinkedList<>(); + this.eventCounter = 0; + this.maxCounter = 1; + } + + private int getNextCounter() { + if (maxCounter > 0 && eventCounter >= maxCounter) + eventCounter = 0; + this.eventCounter++; + return this.eventCounter; + } + + @Override + public void put(ContentEvent event) { + this.put(event, this.getNextCounter()); + } + + private void put(ContentEvent event, int counter) { + SimpleProcessingItem pi; + int parallelism; + for (StreamDestination destination : destinations) { + pi = (SimpleProcessingItem) destination.getProcessingItem(); + parallelism = destination.getParallelism(); + switch (destination.getPartitioningScheme()) { + case SHUFFLE: + pi.processEvent(event, counter % parallelism); + break; + case GROUP_BY_KEY: + HashCodeBuilder hb = new HashCodeBuilder(); + hb.append(event.getKey()); + int key = hb.build() % parallelism; + pi.processEvent(event, key); + break; + case BROADCAST: + for (int p = 0; p < parallelism; p++) { + pi.processEvent(event, p); + } + break; + } + } + } + + public void addDestination(StreamDestination destination) { + this.destinations.add(destination); + if (maxCounter <= 0) + maxCounter = 1; + maxCounter *= destination.getParallelism(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java new file mode 100644 index 0000000..98dd7a5 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java @@ -0,0 +1,46 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.AbstractTopology; + +public class SimpleTopology extends AbstractTopology { + SimpleTopology(String name) { + super(name); + } + + public void run() { + if (this.getEntranceProcessingItems() == null) + throw new IllegalStateException("You need to set entrance PI before running the topology."); + if (this.getEntranceProcessingItems().size() != 1) + throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is " + + this.getEntranceProcessingItems().size()); + + SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems() + .toArray()[0]; + entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode + entrancePi.startSendingEvents(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/resources/kafka.avsc ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/resources/kafka.avsc b/samoa-api/src/main/resources/kafka.avsc new file mode 100644 index 0000000..c21e153 --- /dev/null +++ b/samoa-api/src/main/resources/kafka.avsc @@ -0,0 +1,61 @@ +[ +{ + "type": "record", + "name": "InstanceData", + "fields": [ + ] +}, +{ + "type": "record", + "name": "SingleClassInstanceData", + "fields": [ + {"name":"classValue", "type": "double"} + ] +}, +{ + "type": "record", + "name": "DenseInstanceData", + "fields": [ + {"name":"attributeValues", "type": {"type": "array", "items": "double"}} + ] +}, +{ + "type": "record", + "name": "SparseInstanceData", + "fields": [ + {"name":"attributeValues", "type": {"type": "array", "items": "double"}}, + {"name":"indexValues", "type": {"type": "array", "items": "int"}}, + {"name":"numberAttributes", "type": "int"} + ] +}, +{ + "type": "record", + "name": "SerializableInstance", + "fields": [ + {"name": "weight", "type": "double"}, + {"name": "instanceData", "type": ["null", "InstanceData", "DenseInstanceData", "SparseInstanceData", "SingleClassInstanceData"]}, + {"name": "classData", "type": "InstanceData"} + ] +}, +{ + "type": "record", + "name": "InstanceContent", + "fields": [ + {"name": "instanceIndex", "type": "long"}, + {"name": "classifierIndex", "type": "int"}, + {"name": "evaluationIndex", "type": "int"}, + {"name":"instance", "type":"SerializableInstance"}, + {"name": "isTraining", "type": "boolean"}, + {"name": "isTesting", "type": "boolean"}, + {"name": "isLast", "type": "boolean"} + ] +}, +{ + "type": "record", + "name": "InstanceContentEvent", + "fields": [ + {"name": "instanceContent", "type": "InstanceContent"} + ] +} +] + http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java index 2a92a31..3da9d6f 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java @@ -56,6 +56,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import static org.junit.Assert.*; import kafka.admin.AdminUtils; @@ -84,15 +85,16 @@ import org.apache.samoa.streams.InstanceStream; * * @author pwawrzyniak */ +//@Ignore public class KafkaEntranceProcessorTest { // @Tested // private KafkaEntranceProcessor kep; - private static final String ZKHOST = "127.0.0.1"; - private static final String BROKERHOST = "127.0.0.1"; - private static final String BROKERPORT = "9092"; - private static final String TOPIC = "test"; - private static final int NUM_INSTANCES = 500; + private static final String ZKHOST = "10.255.251.202"; //10.255.251.202 + private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214 + private static final String BROKERPORT = "6667"; //6667, local: 9092 + private static final String TOPIC = "samoa_test"; //samoa_test, local: test + private static final int NUM_INSTANCES = 50; private static KafkaServer kafkaServer; @@ -108,28 +110,28 @@ public class KafkaEntranceProcessorTest { public static void setUpClass() throws IOException { // setup Zookeeper zkServer = new EmbeddedZookeeper(); - zkConnect = ZKHOST + ":" + zkServer.port(); + zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port(); zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); ZkUtils zkUtils = ZkUtils.apply(zkClient, false); // setup Broker - Properties brokerProps = new Properties(); + /*Properties brokerProps = new Properties(); brokerProps.setProperty("zookeeper.connect", zkConnect); brokerProps.setProperty("broker.id", "0"); brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); KafkaConfig config = new KafkaConfig(brokerProps); Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock); + kafkaServer = TestUtils.createServer(config, mock);*/ // create topic - AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); } @AfterClass public static void tearDownClass() { - kafkaServer.shutdown(); + //kafkaServer.shutdown(); zkClient.close(); zkServer.shutdown(); } @@ -144,7 +146,7 @@ public class KafkaEntranceProcessorTest { } - @Test + /*@Test public void testFetchingNewData() throws InterruptedException, ExecutionException, TimeoutException { Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName()); @@ -187,6 +189,56 @@ public class KafkaEntranceProcessorTest { assertEquals("Number of sent and received instances", NUM_INSTANCES, z); + }*/ + + @Test + public void testFetchingNewDataWithAvro() throws InterruptedException, ExecutionException, TimeoutException { + Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName()); + logger.log(Level.INFO, "AVRO"); + logger.log(Level.INFO, "testFetchingNewDataWithAvro"); + Properties props = TestUtilsForKafka.getConsumerProperties(); + props.setProperty("auto.offset.reset", "earliest"); + KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC, 10000, new KafkaAvroMapper()); + kep.onCreate(1); + +// prepare new thread for data producing + Thread th = new Thread(new Runnable() { + @Override + public void run() { + KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties()); + + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + KafkaAvroMapper avroMapper = new KafkaAvroMapper(); + int i = 0; + for (i = 0; i < NUM_INSTANCES; i++) { + try { + //byte[] data = avroMapper.serialize(TestUtilsForKafka.getData(r, 10, header)); + byte[] data = KafkaAvroMapper.avroBurrSerialize(InstanceContentEvent.class, TestUtilsForKafka.getData(r, 10, header)); + if(data == null) + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Serialize result: null ("+i+")"); + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC, data); + long stat = producer.send(record).get(10, TimeUnit.DAYS).offset(); + Thread.sleep(5); + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Sent avro message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat}); + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + producer.flush(); + producer.close(); + } + }); + th.start(); + + int z = 0; + while (kep.hasNext() && z < NUM_INSTANCES) { + logger.log(Level.INFO, "{0} {1}", new Object[]{z++, kep.nextEvent().toString()}); + } + + assertEquals("Number of sent and received instances", NUM_INSTANCES, z); + + } // private Properties getProducerProperties() { http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java new file mode 100644 index 0000000..31f34fb --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java @@ -0,0 +1,138 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.common.utils.Time; +import org.apache.samoa.streams.kafka.topology.SimpleComponentFactory; +import org.apache.samoa.streams.kafka.topology.SimpleEngine; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** +* +* @author Jakub Jankowski +*/ +@Ignore +public class KafkaTaskTest { + + private static final String ZKHOST = "10.255.251.202"; //10.255.251.202 + private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214 + private static final String BROKERPORT = "6667"; //6667, local: 9092 + private static final String TOPIC = "samoa_test"; //samoa_test, local: test + private static final int NUM_INSTANCES = 500; + + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper + zkServer = new EmbeddedZookeeper(); + zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port(); + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + /*Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock);*/ + + // create topic + //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + } + + @AfterClass + public static void tearDownClass() { + //kafkaServer.shutdown(); + zkClient.close(); + zkServer.shutdown(); + } + + @Before + public void setUp() throws IOException { + + } + + @After + public void tearDown() { + + } + + @Test + public void testKafkaTask() throws InterruptedException, ExecutionException, TimeoutException { + Logger logger = Logger.getLogger(KafkaTaskTest.class.getName()); + logger.log(Level.INFO, "KafkaTask"); + Properties producerProps = TestUtilsForKafka.getProducerProperties(); + Properties consumerProps = TestUtilsForKafka.getConsumerProperties(); + + KafkaTask task = new KafkaTask(producerProps, consumerProps, "kafkaTaskTest", 10000, new KafkaJsonMapper(Charset.defaultCharset()), new KafkaJsonMapper(Charset.defaultCharset())); + task.setFactory(new SimpleComponentFactory()); + task.init(); + SimpleEngine.submitTopology(task.getTopology()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java index 4cd5135..7c1c7c0 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java @@ -72,6 +72,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import static org.junit.Assert.*; @@ -79,11 +80,12 @@ import static org.junit.Assert.*; * * @author pwawrzyniak */ +@Ignore public class KafkaUtilsTest { - private static final String ZKHOST = "127.0.0.1"; - private static final String BROKERHOST = "127.0.0.1"; - private static final String BROKERPORT = "9092"; + private static final String ZKHOST = "10.255.251.202"; //10.255.251.202 + private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214 + private static final String BROKERPORT = "6667"; //6667, local: 9092 private static final String TOPIC_R = "test-r"; private static final String TOPIC_S = "test-s"; @@ -102,29 +104,29 @@ public class KafkaUtilsTest { public static void setUpClass() throws IOException { // setup Zookeeper zkServer = new EmbeddedZookeeper(); - zkConnect = ZKHOST + ":" + zkServer.port(); + zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port(); zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); ZkUtils zkUtils = ZkUtils.apply(zkClient, false); // setup Broker - Properties brokerProps = new Properties(); + /*Properties brokerProps = new Properties(); brokerProps.setProperty("zookeeper.connect", zkConnect); brokerProps.setProperty("broker.id", "0"); brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString()); brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); KafkaConfig config = new KafkaConfig(brokerProps); Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock); + kafkaServer = TestUtils.createServer(config, mock);*/ // create topics - AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + //AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + //AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); } @AfterClass public static void tearDownClass() { - kafkaServer.shutdown(); + //kafkaServer.shutdown(); zkClient.close(); zkServer.shutdown(); } @@ -167,7 +169,7 @@ public class KafkaUtilsTest { instance.initializeConsumer(topics); logger.log(Level.INFO, "Produce data"); - List expResult = sendAndGetMessages(500); + List expResult = sendAndGetMessages(50); logger.log(Level.INFO, "Get results from Kafka"); List<byte[]> result = instance.getKafkaMessages(); @@ -214,7 +216,7 @@ public class KafkaUtilsTest { Random r = new Random(); InstancesHeader header = TestUtilsForKafka.generateHeader(10); Gson gson = new Gson(); - for (int i = 0; i < 500; i++) { + for (int i = 0; i < 50; i++) { byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes(); sent.add(val); instance.sendKafkaMessage(TOPIC_S, val); http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java index 0d30429..8d85fd7 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java @@ -52,10 +52,10 @@ import org.apache.samoa.moa.core.FastVector; */ public class TestUtilsForKafka { - private static final String ZKHOST = "127.0.0.1"; - private static final String BROKERHOST = "127.0.0.1"; - private static final String BROKERPORT = "9092"; - private static final String TOPIC = "test"; + private static final String ZKHOST = "10.255.251.202"; //10.255.251.202 + private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214 + private static final String BROKERPORT = "6667"; //6667, local: 9092 + private static final String TOPIC = "samoa_test"; //samoa_test, local: test protected static InstanceContentEvent getData(Random instanceRandom, int numAtts, InstancesHeader header) { double[] attVals = new double[numAtts + 1]; @@ -126,7 +126,22 @@ public class TestUtilsForKafka { consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProps.setProperty("group.id", "test"); consumerProps.setProperty("auto.offset.reset", "earliest"); -// consumerProps.setProperty("client.id", "consumer0"); + //consumerProps.setProperty("client.id", "consumer0"); return consumerProps; } + + protected static Properties getConsumerProducerProperties() { + Properties props = new Properties(); + props.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); + props.put("enable.auto.commit", "true"); + props.put("auto.commit.interval.ms", "1000"); + props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.setProperty("group.id", "burrito"); + props.setProperty("auto.offset.reset", "earliest"); + props.setProperty("client.id", "burrito"); + return props; + } }
