repo cleanup, split code into 3 branches Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/8fbfde79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/8fbfde79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/8fbfde79
Branch: refs/heads/master Commit: 8fbfde793f459cc680479d76f9d1b1fe86f561d5 Parents: fb17e7f Author: pwawrzyniak <[email protected]> Authored: Wed Jun 7 12:21:08 2017 +0200 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .../samoa/streams/kafka/KafkaAvroMapper.java | 160 --------- .../samoa/streams/kafka/KafkaJsonMapper.java | 106 ------ .../streams/kafka/avro/SamoaDatumReader.java | 115 ------- .../kafka/topology/SimpleComponentFactory.java | 53 --- .../streams/kafka/topology/SimpleEngine.java | 37 --- .../topology/SimpleEntranceProcessingItem.java | 33 -- .../kafka/topology/SimpleProcessingItem.java | 87 ----- .../streams/kafka/topology/SimpleStream.java | 95 ------ .../streams/kafka/topology/SimpleTopology.java | 46 --- .../kafka/AvroSerializerDeserializerTest.java | 70 ---- .../kafka/KafkaDestinationProcessorTest.java | 2 +- .../kafka/KafkaEntranceProcessorTest.java | 66 +--- .../samoa/streams/kafka/KafkaTaskTest.java | 327 +++++++++---------- .../samoa/streams/kafka/OosTestSerializer.java | 60 ++++ .../kafka/topology/SimpleComponentFactory.java | 53 +++ .../streams/kafka/topology/SimpleEngine.java | 37 +++ .../topology/SimpleEntranceProcessingItem.java | 33 ++ .../kafka/topology/SimpleProcessingItem.java | 87 +++++ .../streams/kafka/topology/SimpleStream.java | 95 ++++++ .../streams/kafka/topology/SimpleTopology.java | 46 +++ 20 files changed, 578 insertions(+), 1030 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java deleted file mode 100644 index a045bed..0000000 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; - -import org.apache.avro.Schema; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.avro.specific.SpecificRecord; -import org.apache.samoa.learners.InstanceContentEvent; -import org.apache.samoa.streams.kafka.avro.SamoaDatumReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * Sample class for serializing and deserializing {@link InstanceContentEvent} - * from/to Avro format - * - * @author Jakub Jankowski - * @version 0.5.0-incubating-SNAPSHOT - * @since 0.5.0-incubating - */ -public class KafkaAvroMapper implements KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> { - - private static Logger logger = LoggerFactory.getLogger(KafkaAvroMapper.class); - - @Override - public byte[] serialize(InstanceContentEvent message) { - return avroSerialize(InstanceContentEvent.class, message); - } - - @Override - public InstanceContentEvent deserialize(byte[] message) { - return avroDeserialize(message, InstanceContentEvent.class); - } - - - /** - * Avro serialization based on specified schema - * @param cls - * @param v - * @return - */ - public static <V> byte[] avroSerialize(final Class<V> cls, final V v) { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - try { - Schema schema = new Schema.Parser().parse(KafkaAvroMapper.class.getResourceAsStream("/kafka.avsc")); - DatumWriter<V> writer; - - if (v instanceof SpecificRecord) { - writer = new SpecificDatumWriter<>(schema); - } else { - writer = new ReflectDatumWriter<>(schema); - } - - BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(bout, null); - writer.write(v, binEncoder); - binEncoder.flush(); - - } catch (IOException e) { - e.printStackTrace(); - } catch (final Exception e) { - throw new RuntimeException(e); - } - - return bout.toByteArray(); - - } - - /** - * Avro deserialization based on specified schema - * @param cls - * @param v - * @return - */ - public static <V> V avroDeserialize(byte[] avroBytes, Class<V> clazz) { - V ret = null; - try { - Schema schema = new Schema.Parser().parse(KafkaAvroMapper.class.getResourceAsStream("/kafka.avsc")); - ByteArrayInputStream in = new ByteArrayInputStream(avroBytes); - DatumReader<V> reader = new SamoaDatumReader<>(schema); - - Decoder decoder = DecoderFactory.get().directBinaryDecoder(in, null); - - ret = reader.read(null, decoder); - } catch (IOException e) { - e.printStackTrace(); - } catch (final Exception e) { - throw new RuntimeException(e); - } - - return ret; - } - - /** - * Avro serialization using reflection - * @param cls - * @param v - * @return - */ - public static <V> byte[] toBytesGeneric(final Class<V> cls, final V v) { - final ByteArrayOutputStream bout = new ByteArrayOutputStream(); - final Schema schema = ReflectData.AllowNull.get().getSchema(cls); - final DatumWriter<V> writer = new ReflectDatumWriter<V>(schema); - final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(bout, null); - try { - writer.write(v, binEncoder); - binEncoder.flush(); - } catch (final Exception e) { - throw new RuntimeException(e); - } - - return bout.toByteArray(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java deleted file mode 100644 index 2ac3e04..0000000 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.InstanceCreator; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import java.lang.reflect.Type; -import java.nio.charset.Charset; -import org.apache.samoa.instances.DenseInstanceData; -import org.apache.samoa.instances.InstanceData; -import org.apache.samoa.learners.InstanceContentEvent; - -/** - * Sample class for serializing and deserializing {@link InstanceContentEvent} - * from/to JSON format - * - * @author pwawrzyniak - * @version 0.5.0-incubating-SNAPSHOT - * @since 0.5.0-incubating - */ -public class KafkaJsonMapper implements KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> { - - private final transient Gson gson; - private final Charset charset; - - /** - * Class constructor - * - * @param charset Charset to be used for bytes parsing - */ - public KafkaJsonMapper(Charset charset) { - this.gson = new GsonBuilder().registerTypeAdapter(InstanceData.class, new InstanceDataCustomDeserializer()).create(); - this.charset = charset; - } - - @Override - public InstanceContentEvent deserialize(byte[] message) { - return gson.fromJson(new String(message, this.charset), InstanceContentEvent.class); - } - - @Override - public byte[] serialize(InstanceContentEvent message) { - return gson.toJson(message).getBytes(this.charset); - } - - //Unused - public class InstanceDataCreator implements InstanceCreator<InstanceData> { - - @Override - public InstanceData createInstance(Type type) { - return new DenseInstanceData(); - } - } - - public class InstanceDataCustomDeserializer implements JsonDeserializer<InstanceData> { - - @Override - public DenseInstanceData deserialize(JsonElement je, Type type, JsonDeserializationContext jdc) throws JsonParseException { - double[] attributeValues = null; - JsonObject obj = (JsonObject) je; - attributeValues = jdc.deserialize(obj.get("attributeValues"), double[].class); - DenseInstanceData did = new DenseInstanceData(attributeValues); - return did; - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/avro/SamoaDatumReader.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/avro/SamoaDatumReader.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/avro/SamoaDatumReader.java deleted file mode 100644 index b7a18aa..0000000 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/avro/SamoaDatumReader.java +++ /dev/null @@ -1,115 +0,0 @@ -package org.apache.samoa.streams.kafka.avro; - -import java.io.IOException; - -import org.apache.avro.AvroRuntimeException; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.generic.GenericData.Array; -import org.apache.avro.generic.IndexedRecord; -import org.apache.avro.io.ResolvingDecoder; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.samoa.instances.DenseInstanceData; -import org.apache.samoa.instances.SingleClassInstanceData; -import org.apache.samoa.instances.SparseInstanceData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * DatumReader used to read objects built with InstanceData classes - * @author Jakub Jankowski - * - * @param <T> - */ -public class SamoaDatumReader<T> extends ReflectDatumReader<T> { - - private static Logger logger = LoggerFactory.getLogger(SamoaDatumReader.class); - - public SamoaDatumReader() { - super(); - } - - /** Construct for reading instances of a class. */ - public SamoaDatumReader(Class<T> c) { - super(c); - } - - /** Construct where the writer's and reader's schemas are the same. */ - public SamoaDatumReader(Schema root) { - super(root); - } - - /** Construct given writer's and reader's schema. */ - public SamoaDatumReader(Schema writer, Schema reader) { - super(writer, reader); - } - - /** Construct given writer's and reader's schema and the data model. */ - public SamoaDatumReader(Schema writer, Schema reader, ReflectData data) { - super(writer, reader, data); - } - - /** Construct given a {@link ReflectData}. */ - public SamoaDatumReader(ReflectData data) { - super(data); - } - - @Override - /** - * Called to read a record instance. Overridden to read InstanceData. - */ - protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) throws IOException { - Object r = getData().newRecord(old, expected); - Object state = null; - - for (Field f : in.readFieldOrder()) { - int pos = f.pos(); - String name = f.name(); - Object oldDatum = null; - if (r instanceof DenseInstanceData) { - r = readDenseInstanceData(r, f, oldDatum, in, state); - } else if (r instanceof SparseInstanceData) { - r = readSparseInstanceData(r, f, oldDatum, in, state); - } else - readField(r, f, oldDatum, in, state); - } - - return r; - } - - private Object readDenseInstanceData(Object record, Field f, Object oldDatum, ResolvingDecoder in, Object state) - throws IOException { - if (f.name().equals("attributeValues")) { - Array atributes = (Array) read(oldDatum, f.schema(), in); - double[] atributesArr = new double[atributes.size()]; - for (int i = 0; i < atributes.size(); i++) { - atributesArr[i] = (double) atributes.get(i); - } - return new DenseInstanceData(atributesArr); - } - return null; - } - - private Object readSparseInstanceData(Object record, Field f, Object oldDatum, ResolvingDecoder in, Object state) - throws IOException { - if(f.name().equals("attributeValues")) { - Array atributes = (Array) read(oldDatum, f.schema(), in); - double[] atributesArr = new double[atributes.size()]; - for (int i = 0; i < atributes.size(); i++) - atributesArr[i] = (double) atributes.get(i); - ((SparseInstanceData)record).setAttributeValues(atributesArr); - } - if(f.name().equals("indexValues")) { - Array indexValues = (Array) read(oldDatum, f.schema(), in); - int[] indexValuesArr = new int[indexValues.size()]; - for (int i = 0; i < indexValues.size(); i++) { - indexValuesArr[i] = (int) indexValues.get(i); - } - ((SparseInstanceData)record).setIndexValues(indexValuesArr); - } - return record; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java deleted file mode 100644 index 155ce1f..0000000 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java +++ /dev/null @@ -1,53 +0,0 @@ -package org.apache.samoa.streams.kafka.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samoa.core.EntranceProcessor; -import org.apache.samoa.core.Processor; -import org.apache.samoa.topology.ComponentFactory; -import org.apache.samoa.topology.EntranceProcessingItem; -import org.apache.samoa.topology.IProcessingItem; -import org.apache.samoa.topology.ProcessingItem; -import org.apache.samoa.topology.Stream; -import org.apache.samoa.topology.Topology; - -public class SimpleComponentFactory implements ComponentFactory { - - public ProcessingItem createPi(Processor processor, int paralellism) { - return new SimpleProcessingItem(processor, paralellism); - } - - public ProcessingItem createPi(Processor processor) { - return this.createPi(processor, 1); - } - - public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { - return new SimpleEntranceProcessingItem(processor); - } - - public Stream createStream(IProcessingItem sourcePi) { - return new SimpleStream(sourcePi); - } - - public Topology createTopology(String topoName) { - return new SimpleTopology(topoName); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java deleted file mode 100644 index d446018..0000000 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ -package org.apache.samoa.streams.kafka.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samoa.topology.Topology; - -public class SimpleEngine { - - public static void submitTopology(Topology topology) { - SimpleTopology simpleTopology = (SimpleTopology) topology; - simpleTopology.run(); - // runs until completion - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java deleted file mode 100644 index 4c626dc..0000000 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.apache.samoa.streams.kafka.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samoa.core.EntranceProcessor; -import org.apache.samoa.topology.LocalEntranceProcessingItem; - -class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem { - public SimpleEntranceProcessingItem(EntranceProcessor processor) { - super(processor); - } - - // The default waiting time when there is no available events is 100ms - // Override waitForNewEvents() to change it -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java deleted file mode 100644 index 3549b85..0000000 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ -package org.apache.samoa.streams.kafka.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samoa.core.ContentEvent; -import org.apache.samoa.core.Processor; -import org.apache.samoa.topology.AbstractProcessingItem; -import org.apache.samoa.topology.IProcessingItem; -import org.apache.samoa.topology.ProcessingItem; -import org.apache.samoa.topology.Stream; -import org.apache.samoa.utils.PartitioningScheme; -import org.apache.samoa.utils.StreamDestination; - -/** - * - * @author abifet - */ -class SimpleProcessingItem extends AbstractProcessingItem { - private IProcessingItem[] arrayProcessingItem; - - SimpleProcessingItem(Processor processor) { - super(processor); - } - - SimpleProcessingItem(Processor processor, int parallelism) { - super(processor); - this.setParallelism(parallelism); - } - - public IProcessingItem getProcessingItem(int i) { - return arrayProcessingItem[i]; - } - - @Override - protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { - StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); - ((SimpleStream) inputStream).addDestination(destination); - return this; - } - - public SimpleProcessingItem copy() { - Processor processor = this.getProcessor(); - return new SimpleProcessingItem(processor.newProcessor(processor)); - } - - public void processEvent(ContentEvent event, int counter) { - - int parallelism = this.getParallelism(); - // System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism); - if (this.arrayProcessingItem == null && parallelism > 0) { - // Init processing elements, the first time they are needed - this.arrayProcessingItem = new IProcessingItem[parallelism]; - for (int j = 0; j < parallelism; j++) { - arrayProcessingItem[j] = this.copy(); - arrayProcessingItem[j].getProcessor().onCreate(j); - } - } - if (this.arrayProcessingItem != null) { - IProcessingItem pi = this.getProcessingItem(counter); - Processor p = pi.getProcessor(); - // System.out.println("PI="+pi+", p="+p); - this.getProcessingItem(counter).getProcessor().process(event); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java deleted file mode 100644 index 269e0cc..0000000 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ -package org.apache.samoa.streams.kafka.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.LinkedList; -import java.util.List; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.samoa.core.ContentEvent; -import org.apache.samoa.topology.AbstractStream; -import org.apache.samoa.topology.IProcessingItem; -import org.apache.samoa.utils.StreamDestination; - -/** - * - * @author abifet - */ -class SimpleStream extends AbstractStream { - private List<StreamDestination> destinations; - private int maxCounter; - private int eventCounter; - - SimpleStream(IProcessingItem sourcePi) { - super(sourcePi); - this.destinations = new LinkedList<>(); - this.eventCounter = 0; - this.maxCounter = 1; - } - - private int getNextCounter() { - if (maxCounter > 0 && eventCounter >= maxCounter) - eventCounter = 0; - this.eventCounter++; - return this.eventCounter; - } - - @Override - public void put(ContentEvent event) { - this.put(event, this.getNextCounter()); - } - - private void put(ContentEvent event, int counter) { - SimpleProcessingItem pi; - int parallelism; - for (StreamDestination destination : destinations) { - pi = (SimpleProcessingItem) destination.getProcessingItem(); - parallelism = destination.getParallelism(); - switch (destination.getPartitioningScheme()) { - case SHUFFLE: - pi.processEvent(event, counter % parallelism); - break; - case GROUP_BY_KEY: - HashCodeBuilder hb = new HashCodeBuilder(); - hb.append(event.getKey()); - int key = hb.build() % parallelism; - pi.processEvent(event, key); - break; - case BROADCAST: - for (int p = 0; p < parallelism; p++) { - pi.processEvent(event, p); - } - break; - } - } - } - - public void addDestination(StreamDestination destination) { - this.destinations.add(destination); - if (maxCounter <= 0) - maxCounter = 1; - maxCounter *= destination.getParallelism(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java deleted file mode 100644 index 98dd7a5..0000000 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ -package org.apache.samoa.streams.kafka.topology; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.apache.samoa.topology.AbstractTopology; - -public class SimpleTopology extends AbstractTopology { - SimpleTopology(String name) { - super(name); - } - - public void run() { - if (this.getEntranceProcessingItems() == null) - throw new IllegalStateException("You need to set entrance PI before running the topology."); - if (this.getEntranceProcessingItems().size() != 1) - throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is " - + this.getEntranceProcessingItems().size()); - - SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems() - .toArray()[0]; - entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode - entrancePi.startSendingEvents(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/AvroSerializerDeserializerTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/AvroSerializerDeserializerTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/AvroSerializerDeserializerTest.java deleted file mode 100644 index 1a1a718..0000000 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/AvroSerializerDeserializerTest.java +++ /dev/null @@ -1,70 +0,0 @@ -package org.apache.samoa.streams.kafka; - -import static org.junit.Assert.assertTrue; - -import java.util.Random; -import java.util.logging.Logger; - -import org.apache.samoa.instances.InstancesHeader; -import org.apache.samoa.learners.InstanceContentEvent; -import org.apache.samoa.streams.kafka.KafkaAvroMapper; -import org.junit.Test; - -public class AvroSerializerDeserializerTest { - - private Logger logger = Logger.getLogger(AvroSerializerDeserializerTest.class.getName()); - public AvroSerializerDeserializerTest() {} - - @Test - public void testAvroSerialize() { - Random r = new Random(); - InstancesHeader header = TestUtilsForKafka.generateHeader(10); - InstanceContentEvent eventToSerialize = TestUtilsForKafka.getData(r, 10, header); - byte[] data = KafkaAvroMapper.avroSerialize(InstanceContentEvent.class, eventToSerialize); - - InstanceContentEvent eventDeserialized = KafkaAvroMapper.avroDeserialize(data, InstanceContentEvent.class); - - assertTrue("Serialized and deserialized event", isEqual(eventToSerialize, eventDeserialized)); - - } - - public boolean isEqual(InstanceContentEvent a, InstanceContentEvent b) { - if(a.getClassId() != b.getClassId()) { - logger.info("a.getClassId() != b.getClassId(): " + (a.getClassId() != b.getClassId())); - return false; - } - if(a.isLastEvent() != b.isLastEvent()) { - logger.info("a.isLastEvent() != b.isLastEvent(): " + (a.isLastEvent() != b.isLastEvent())); - return false; - } - if(a.isTesting() != b.isTesting()) { - logger.info("a.isTesting() != b.isTesting(): " + (a.isTesting() != b.isTesting())); - return false; - } - if(a.isTraining() != b.isTraining()) { - logger.info("a.isTraining() != b.isTraining(): " + (a.isTraining() != b.isTraining())); - return false; - } - if(a.getClassifierIndex() != b.getClassifierIndex()) { - logger.info("a.getClassifierIndex() != b.getClassifierIndex(): " + (a.getClassifierIndex() != b.getClassifierIndex())); - return false; - } - if(a.getEvaluationIndex() != b.getEvaluationIndex()) { - logger.info("a.getEvaluationIndex() != b.getEvaluationIndex(): " + (a.getEvaluationIndex() != b.getEvaluationIndex())); - return false; - } - if(a.getInstanceIndex() != b.getInstanceIndex()) { - logger.info("a.getInstanceIndex() != b.getInstanceIndex(): " + (a.getInstanceIndex() != b.getInstanceIndex())); - return false; - } - if(!a.getInstance().toString().equals(b.getInstance().toString())) { - logger.info("a.getInstance().toString()!= b.getInstance().toString(): " + (a.getInstance().toString()!= b.getInstance().toString())); - logger.info("a.toString(): " + a.getInstance().toString()); - logger.info("b.toString(): " + b.getInstance().toString()); - return false; - } - - return true; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java index bf45ffb..2d59456 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java @@ -135,7 +135,7 @@ public class KafkaDestinationProcessorTest { final Logger logger = Logger.getLogger(KafkaDestinationProcessorTest.class.getName()); Properties props = TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT); props.setProperty("auto.offset.reset", "earliest"); - KafkaDestinationProcessor kdp = new KafkaDestinationProcessor(props, TOPIC, new KafkaJsonMapper(Charset.defaultCharset())); + KafkaDestinationProcessor kdp = new KafkaDestinationProcessor(props, TOPIC, new OosTestSerializer()); kdp.onCreate(1); final int[] i = {0}; http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java index 933ba2a..b8b5c72 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java @@ -138,80 +138,32 @@ public class KafkaEntranceProcessorTest { } @Test - public void testFetchingNewDataWithJson() throws InterruptedException, ExecutionException, TimeoutException { + public void testFetchingNewData() throws InterruptedException, ExecutionException, TimeoutException { final Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName()); - logger.log(Level.INFO, "JSON"); - logger.log(Level.INFO, "testFetchingNewDataWithJson"); + logger.log(Level.INFO, "OOS"); + logger.log(Level.INFO, "testFetchingNewData"); Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT); props.setProperty("auto.offset.reset", "earliest"); - KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_JSON, TIMEOUT, new KafkaJsonMapper(Charset.defaultCharset())); + KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_JSON, TIMEOUT, new OosTestSerializer()); kep.onCreate(1); - + // prepare new thread for data producing Thread th = new Thread(new Runnable() { @Override public void run() { - KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT)); + KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT)); Random r = new Random(); InstancesHeader header = TestUtilsForKafka.generateHeader(10); - Gson gson = new Gson(); + OosTestSerializer serializer = new OosTestSerializer(); int i = 0; for (i = 0; i < NUM_INSTANCES; i++) { try { InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header); - - ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_JSON, gson.toJson(event).getBytes()); - long stat = producer.send(record).get(10, TimeUnit.SECONDS).offset(); - } catch (InterruptedException | ExecutionException | TimeoutException ex) { - Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); - } - } - producer.flush(); - producer.close(); - } - }); - th.start(); - int z = 0; - while (z < NUM_INSTANCES && kep.hasNext()) { - InstanceContentEvent event = (InstanceContentEvent) kep.nextEvent(); - z++; - } - - assertEquals("Number of sent and received instances", NUM_INSTANCES, z); - - } - - @Test - public void testFetchingNewDataWithAvro() throws InterruptedException, ExecutionException, TimeoutException { - Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName()); - logger.log(Level.INFO, "AVRO"); - logger.log(Level.INFO, "testFetchingNewDataWithAvro"); - Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT); - props.setProperty("auto.offset.reset", "earliest"); - KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_AVRO, TIMEOUT, new KafkaAvroMapper()); - kep.onCreate(1); - -// prepare new thread for data producing - Thread th = new Thread(new Runnable() { - @Override - public void run() { - KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT)); - - Random r = new Random(); - InstancesHeader header = TestUtilsForKafka.generateHeader(10); - - int i = 0; - for (i = 0; i < NUM_INSTANCES; i++) { - try { - byte[] data = KafkaAvroMapper.avroSerialize(InstanceContentEvent.class, TestUtilsForKafka.getData(r, 10, header)); - if (data == null) { - Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Serialize result: null ({0})", i); - } - ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_AVRO, data); + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_JSON, serializer.serialize(event)); long stat = producer.send(record).get(10, TimeUnit.SECONDS).offset(); } catch (InterruptedException | ExecutionException | TimeoutException ex) { Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); @@ -227,9 +179,9 @@ public class KafkaEntranceProcessorTest { while (z < NUM_INSTANCES && kep.hasNext()) { InstanceContentEvent event = (InstanceContentEvent) kep.nextEvent(); z++; -// logger.log(Level.INFO, "{0} {1}", new Object[]{z, event.getInstance().toString()}); } assertEquals("Number of sent and received instances", NUM_INSTANCES, z); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java index 08aae11..4215b08 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java @@ -1,170 +1,157 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -import com.google.gson.Gson; -import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.I0Itec.zkclient.ZkClient; -import org.apache.kafka.common.utils.Time; -import org.apache.samoa.streams.kafka.topology.SimpleComponentFactory; -import org.apache.samoa.streams.kafka.topology.SimpleEngine; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.MockTime; -import kafka.utils.TestUtils; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import kafka.zk.EmbeddedZookeeper; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.samoa.instances.InstancesHeader; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** -* -* @author Jakub Jankowski -*/ -@Ignore -public class KafkaTaskTest { - - private static final String ZKHOST = "127.0.0.1";//10.255.251.202"; //10.255.251.202 - private static final String BROKERHOST = "127.0.0.1";//"10.255.251.214"; //10.255.251.214 - private static final String BROKERPORT = "9092"; //6667, local: 9092 - private static final String TOPIC = "samoa_test"; //samoa_test, local: test - private static final int NUM_INSTANCES = 125922; - - - private static KafkaServer kafkaServer; - private static EmbeddedZookeeper zkServer; - private static ZkClient zkClient; - private static String zkConnect; - - @BeforeClass - public static void setUpClass() throws IOException { - // setup Zookeeper -// zkServer = new EmbeddedZookeeper(); -// zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port(); -// zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); -// ZkUtils zkUtils = ZkUtils.apply(zkClient, false); - - // setup Broker - /*Properties brokerProps = new Properties(); - brokerProps.setProperty("zookeeper.connect", zkConnect); - brokerProps.setProperty("broker.id", "0"); - brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); - brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); - KafkaConfig config = new KafkaConfig(brokerProps); - Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock);*/ - - // create topic - //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - } - - @AfterClass - public static void tearDownClass() { - //kafkaServer.shutdown(); -// zkClient.close(); -// zkServer.shutdown(); - } - - @Before - public void setUp() throws IOException { - - } - - @After - public void tearDown() { - - } - - @Test - public void testKafkaTask() throws InterruptedException, ExecutionException, TimeoutException { - Logger logger = Logger.getLogger(KafkaTaskTest.class.getName()); - logger.log(Level.INFO, "KafkaTask"); - Properties producerProps = TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT); - Properties consumerProps = TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT); - - KafkaTask task = new KafkaTask(producerProps, consumerProps, "kafkaTaskTest", 10000, new KafkaJsonMapper(Charset.defaultCharset()), new KafkaJsonMapper(Charset.defaultCharset())); - task.setFactory(new SimpleComponentFactory()); - task.init(); - SimpleEngine.submitTopology(task.getTopology()); - - Thread th = new Thread(new Runnable() { - @Override - public void run() { - KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT)); - - Random r = new Random(); - InstancesHeader header = TestUtilsForKafka.generateHeader(10); - Gson gson = new Gson(); - int i = 0; - for (i = 0; i < NUM_INSTANCES; i++) { - try { - ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC, gson.toJson(TestUtilsForKafka.getData(r, 10, header)).getBytes()); - long stat = producer.send(record).get(10, TimeUnit.DAYS).offset(); -// Thread.sleep(5); - Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat}); - } catch (InterruptedException | ExecutionException | TimeoutException ex) { - Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); - } - } - producer.flush(); - producer.close(); - } - }); - th.start(); - - } -} +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.io.IOException; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + + +import org.I0Itec.zkclient.ZkClient; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import kafka.server.KafkaServer; +import kafka.zk.EmbeddedZookeeper; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.streams.kafka.topology.SimpleComponentFactory; +import org.apache.samoa.streams.kafka.topology.SimpleEngine; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +/** + * + * @author Jakub Jankowski + */ +@Ignore +public class KafkaTaskTest { + + private static final String ZKHOST = "127.0.0.1";//10.255.251.202"; //10.255.251.202 + private static final String BROKERHOST = "127.0.0.1";//"10.255.251.214"; //10.255.251.214 + private static final String BROKERPORT = "9092"; //6667, local: 9092 + private static final String TOPIC = "samoa_test"; //samoa_test, local: test + private static final int NUM_INSTANCES = 125922; + + private static KafkaServer kafkaServer; + private static EmbeddedZookeeper zkServer; + private static ZkClient zkClient; + private static String zkConnect; + + @BeforeClass + public static void setUpClass() throws IOException { + // setup Zookeeper +// zkServer = new EmbeddedZookeeper(); +// zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port(); +// zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); +// ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + /*Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock);*/ + // create topic + //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + } + + @AfterClass + public static void tearDownClass() { + //kafkaServer.shutdown(); +// zkClient.close(); +// zkServer.shutdown(); + } + + @Before + public void setUp() throws IOException { + + } + + @After + public void tearDown() { + + } + + @Test + public void testKafkaTask() throws InterruptedException, ExecutionException, TimeoutException { + Logger logger = Logger.getLogger(KafkaTaskTest.class.getName()); + logger.log(Level.INFO, "KafkaTask"); + Properties producerProps = TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT); + Properties consumerProps = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT); + + KafkaTask task = new KafkaTask(producerProps, consumerProps, "kafkaTaskTest", 10000, new OosTestSerializer(), new OosTestSerializer()); + task.setFactory(new SimpleComponentFactory()); + task.init(); + SimpleEngine.submitTopology(task.getTopology()); + + Thread th = new Thread(new Runnable() { + @Override + public void run() { + KafkaProducer<String, byte[]> producer = new KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT)); + + Random r = new Random(); + InstancesHeader header = TestUtilsForKafka.generateHeader(10); + OosTestSerializer serializer = new OosTestSerializer(); + int i = 0; + for (i = 0; i < NUM_INSTANCES; i++) { + try { + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC, serializer.serialize(TestUtilsForKafka.getData(r, 10, header))); + long stat = producer.send(record).get(10, TimeUnit.DAYS).offset(); +// Thread.sleep(5); + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat}); + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + producer.flush(); + producer.close(); + } + }); + th.start(); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java new file mode 100644 index 0000000..649d3e0 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.learners.InstanceContentEvent; + +/** + * + * @author Piotr Wawrzyniak + */ +public class OosTestSerializer implements KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> { + + @Override + public InstanceContentEvent deserialize(byte[] message) { + try { + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message)); + InstanceContentEvent ice = (InstanceContentEvent)ois.readObject(); + return ice; + } catch (IOException | ClassNotFoundException ex) { + Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex); + } + return null; + } + + @Override + public byte[] serialize(InstanceContentEvent message) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(message); + oos.flush(); + return baos.toByteArray(); + } catch (IOException ex) { + Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex); + } + return null; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java new file mode 100644 index 0000000..202833e --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java @@ -0,0 +1,53 @@ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; + +public class SimpleComponentFactory implements ComponentFactory { + + public ProcessingItem createPi(Processor processor, int paralellism) { + return new SimpleProcessingItem(processor, paralellism); + } + + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } + + public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { + return new SimpleEntranceProcessingItem(processor); + } + + public Stream createStream(IProcessingItem sourcePi) { + return new SimpleStream(sourcePi); + } + + public Topology createTopology(String topoName) { + return new SimpleTopology(topoName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java new file mode 100644 index 0000000..338444b --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java @@ -0,0 +1,37 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.Topology; + +public class SimpleEngine { + + public static void submitTopology(Topology topology) { + SimpleTopology simpleTopology = (SimpleTopology) topology; + simpleTopology.run(); + // runs until completion + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java new file mode 100644 index 0000000..26ed471 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java @@ -0,0 +1,33 @@ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.LocalEntranceProcessingItem; + +class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem { + public SimpleEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + // The default waiting time when there is no available events is 100ms + // Override waitForNewEvents() to change it +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java new file mode 100644 index 0000000..bac0398 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java @@ -0,0 +1,87 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.AbstractProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.utils.PartitioningScheme; +import org.apache.samoa.utils.StreamDestination; + +/** + * + * @author abifet + */ +class SimpleProcessingItem extends AbstractProcessingItem { + private IProcessingItem[] arrayProcessingItem; + + SimpleProcessingItem(Processor processor) { + super(processor); + } + + SimpleProcessingItem(Processor processor, int parallelism) { + super(processor); + this.setParallelism(parallelism); + } + + public IProcessingItem getProcessingItem(int i) { + return arrayProcessingItem[i]; + } + + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); + ((SimpleStream) inputStream).addDestination(destination); + return this; + } + + public SimpleProcessingItem copy() { + Processor processor = this.getProcessor(); + return new SimpleProcessingItem(processor.newProcessor(processor)); + } + + public void processEvent(ContentEvent event, int counter) { + + int parallelism = this.getParallelism(); + // System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism); + if (this.arrayProcessingItem == null && parallelism > 0) { + // Init processing elements, the first time they are needed + this.arrayProcessingItem = new IProcessingItem[parallelism]; + for (int j = 0; j < parallelism; j++) { + arrayProcessingItem[j] = this.copy(); + arrayProcessingItem[j].getProcessor().onCreate(j); + } + } + if (this.arrayProcessingItem != null) { + IProcessingItem pi = this.getProcessingItem(counter); + Processor p = pi.getProcessor(); + // System.out.println("PI="+pi+", p="+p); + this.getProcessingItem(counter).getProcessor().process(event); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java new file mode 100644 index 0000000..8405463 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java @@ -0,0 +1,95 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.AbstractStream; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.utils.StreamDestination; + +/** + * + * @author abifet + */ +class SimpleStream extends AbstractStream { + private List<StreamDestination> destinations; + private int maxCounter; + private int eventCounter; + + SimpleStream(IProcessingItem sourcePi) { + super(sourcePi); + this.destinations = new LinkedList<>(); + this.eventCounter = 0; + this.maxCounter = 1; + } + + private int getNextCounter() { + if (maxCounter > 0 && eventCounter >= maxCounter) + eventCounter = 0; + this.eventCounter++; + return this.eventCounter; + } + + @Override + public void put(ContentEvent event) { + this.put(event, this.getNextCounter()); + } + + private void put(ContentEvent event, int counter) { + SimpleProcessingItem pi; + int parallelism; + for (StreamDestination destination : destinations) { + pi = (SimpleProcessingItem) destination.getProcessingItem(); + parallelism = destination.getParallelism(); + switch (destination.getPartitioningScheme()) { + case SHUFFLE: + pi.processEvent(event, counter % parallelism); + break; + case GROUP_BY_KEY: + HashCodeBuilder hb = new HashCodeBuilder(); + hb.append(event.getKey()); + int key = hb.build() % parallelism; + pi.processEvent(event, key); + break; + case BROADCAST: + for (int p = 0; p < parallelism; p++) { + pi.processEvent(event, p); + } + break; + } + } + } + + public void addDestination(StreamDestination destination) { + this.destinations.add(destination); + if (maxCounter <= 0) + maxCounter = 1; + maxCounter *= destination.getParallelism(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java new file mode 100644 index 0000000..d298b69 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java @@ -0,0 +1,46 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.samoa.streams.kafka.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.AbstractTopology; + +public class SimpleTopology extends AbstractTopology { + SimpleTopology(String name) { + super(name); + } + + public void run() { + if (this.getEntranceProcessingItems() == null) + throw new IllegalStateException("You need to set entrance PI before running the topology."); + if (this.getEntranceProcessingItems().size() != 1) + throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is " + + this.getEntranceProcessingItems().size()); + + SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems() + .toArray()[0]; + entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode + entrancePi.startSendingEvents(); + } +}
