Minor changes, code cleanup etc. Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/eeb06916 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/eeb06916 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/eeb06916
Branch: refs/heads/master Commit: eeb0691654ead552d10686694974a172ec0a8e92 Parents: 8fbfde7 Author: pwawrzyniak <[email protected]> Authored: Mon Jun 19 15:08:05 2017 +0200 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .../streams/kafka/KafkaConsumerThread.java | 4 - .../samoa/streams/kafka/KafkaDeserializer.java | 44 ++- .../kafka/KafkaDestinationProcessor.java | 4 - .../streams/kafka/KafkaEntranceProcessor.java | 4 - .../samoa/streams/kafka/KafkaSerializer.java | 44 ++- .../apache/samoa/streams/kafka/KafkaTask.java | 292 +++++++++---------- .../apache/samoa/streams/kafka/KafkaUtils.java | 12 +- .../kafka/KafkaDestinationProcessorTest.java | 5 - .../kafka/KafkaEntranceProcessorTest.java | 14 +- .../samoa/streams/kafka/KafkaTaskTest.java | 4 - .../samoa/streams/kafka/KafkaUtilsTest.java | 4 - .../samoa/streams/kafka/OosTestSerializer.java | 2 - .../samoa/streams/kafka/TestUtilsForKafka.java | 7 - 13 files changed, 191 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java index a93986e..fbd3ec6 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java @@ -1,6 +1,4 @@ /* - * Copyright 2017 The Apache Software Foundation. - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka; * #%L * SAMOA * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java index 7b11cbd..459c491 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java @@ -1,6 +1,4 @@ /* - * Copyright 2017 The Apache Software Foundation. - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -13,28 +11,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.samoa.streams.kafka; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + import org.apache.samoa.core.ContentEvent; http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java index 420d43c..231e25d 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java @@ -1,6 +1,4 @@ /* - * Copyright 2017 The Apache Software Foundation. - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka; * #%L * SAMOA * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java index 7079c58..ea5d06e 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java @@ -1,6 +1,4 @@ /* - * Copyright 2017 The Apache Software Foundation. - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka; * #%L * SAMOA * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java index ad6bd8e..2bbc259 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java @@ -1,6 +1,4 @@ /* - * Copyright 2017 The Apache Software Foundation. - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -13,28 +11,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.samoa.streams.kafka; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + import org.apache.samoa.core.ContentEvent; http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java index 26012f2..0c8f138 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java @@ -1,148 +1,144 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Properties; - -import org.apache.samoa.tasks.Task; -import org.apache.samoa.topology.ComponentFactory; -import org.apache.samoa.topology.Stream; -import org.apache.samoa.topology.Topology; -import org.apache.samoa.topology.TopologyBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.javacliparser.Configurable; -import com.github.javacliparser.IntOption; -import com.github.javacliparser.StringOption; - -/** - * Kafka task - * - * @author Jakub Jankowski - * @version 0.5.0-incubating-SNAPSHOT - * @since 0.5.0-incubating - * - */ - -public class KafkaTask implements Task, Configurable { - - private static final long serialVersionUID = 3984474041982397855L; - private static Logger logger = LoggerFactory.getLogger(KafkaTask.class); - - //czy identyczne dla enterance i destination? - Properties producerProps; - Properties consumerProps; - int timeout; - private final KafkaDeserializer deserializer; - private final KafkaSerializer serializer; - private final String topic; - - private TopologyBuilder builder; - private Topology kafkaTopology; - - public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 'p', - "Number of destination Processors", 1, 1, Integer.MAX_VALUE); - - public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", - "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); - - /** - * Class constructor - * @param props Properties of Kafka Producer and Consumer - * @see <a href="http://kafka.apache.org/documentation/#producerconfigs">Kafka Producer configuration</a> - * @see <a href="http://kafka.apache.org/documentation/#consumerconfigs">Kafka Consumer configuration</a> - * @param topic Topic to which destination processor will write into - * @param timeout Timeout used when polling Kafka for new messages - * @param serializer Implementation of KafkaSerializer that handles arriving data serialization - * @param serializer Implementation of KafkaDeserializer that handles arriving data deserialization - */ - public KafkaTask(Properties producerProps, Properties consumerProps, String topic, int timeout, KafkaSerializer serializer, KafkaDeserializer deserializer) { - this.producerProps = producerProps; - this.consumerProps = consumerProps; - this.deserializer = deserializer; - this.serializer = serializer; - this.topic = topic; - this.timeout = timeout; - } - - @Override - public void init() { - logger.info("Invoking init"); - if (builder == null) { - builder = new TopologyBuilder(); - logger.info("Successfully instantiating TopologyBuilder"); - - builder.initTopology(evaluationNameOption.getValue()); - logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); - } - - // create enterance processor - KafkaEntranceProcessor sourceProcessor = new KafkaEntranceProcessor(consumerProps, topic, timeout, deserializer); - builder.addEntranceProcessor(sourceProcessor); - - // create stream - Stream stream = builder.createStream(sourceProcessor); - - // create destination processor - KafkaDestinationProcessor destProcessor = new KafkaDestinationProcessor(producerProps, topic, serializer); - builder.addProcessor(destProcessor, kafkaParallelismOption.getValue()); - builder.connectInputShuffleStream(stream, destProcessor); - - // build topology - kafkaTopology = builder.build(); - logger.info("Successfully built the topology"); - } - - @Override - public Topology getTopology() { - return kafkaTopology; - } - - @Override - public void setFactory(ComponentFactory factory) { - logger.info("Invoking setFactory: "+factory.toString()); - builder = new TopologyBuilder(factory); - logger.info("Successfully instantiating TopologyBuilder"); - - builder.initTopology(evaluationNameOption.getValue()); - logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); - - } - -} +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; + +import org.apache.samoa.tasks.Task; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; +import org.apache.samoa.topology.TopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.Configurable; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; + +/** + * Kafka task + * + * @author Jakub Jankowski + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + * + */ + +public class KafkaTask implements Task, Configurable { + + private static final long serialVersionUID = 3984474041982397855L; + private static Logger logger = LoggerFactory.getLogger(KafkaTask.class); + + //czy identyczne dla enterance i destination? + Properties producerProps; + Properties consumerProps; + int timeout; + private final KafkaDeserializer deserializer; + private final KafkaSerializer serializer; + private final String topic; + + private TopologyBuilder builder; + private Topology kafkaTopology; + + public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 'p', + "Number of destination Processors", 1, 1, Integer.MAX_VALUE); + + public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", + "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + + /** + * Class constructor + * @param props Properties of Kafka Producer and Consumer + * @see <a href="http://kafka.apache.org/documentation/#producerconfigs">Kafka Producer configuration</a> + * @see <a href="http://kafka.apache.org/documentation/#consumerconfigs">Kafka Consumer configuration</a> + * @param topic Topic to which destination processor will write into + * @param timeout Timeout used when polling Kafka for new messages + * @param serializer Implementation of KafkaSerializer that handles arriving data serialization + * @param serializer Implementation of KafkaDeserializer that handles arriving data deserialization + */ + public KafkaTask(Properties producerProps, Properties consumerProps, String topic, int timeout, KafkaSerializer serializer, KafkaDeserializer deserializer) { + this.producerProps = producerProps; + this.consumerProps = consumerProps; + this.deserializer = deserializer; + this.serializer = serializer; + this.topic = topic; + this.timeout = timeout; + } + + @Override + public void init() { + logger.info("Invoking init"); + if (builder == null) { + builder = new TopologyBuilder(); + logger.info("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + } + + // create enterance processor + KafkaEntranceProcessor sourceProcessor = new KafkaEntranceProcessor(consumerProps, topic, timeout, deserializer); + builder.addEntranceProcessor(sourceProcessor); + + // create stream + Stream stream = builder.createStream(sourceProcessor); + + // create destination processor + KafkaDestinationProcessor destProcessor = new KafkaDestinationProcessor(producerProps, topic, serializer); + builder.addProcessor(destProcessor, kafkaParallelismOption.getValue()); + builder.connectInputShuffleStream(stream, destProcessor); + + // build topology + kafkaTopology = builder.build(); + logger.info("Successfully built the topology"); + } + + @Override + public Topology getTopology() { + return kafkaTopology; + } + + @Override + public void setFactory(ComponentFactory factory) { + logger.info("Invoking setFactory: "+factory.toString()); + builder = new TopologyBuilder(factory); + logger.info("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java index 75b5402..fb3aef7 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java @@ -1,6 +1,4 @@ /* - * Copyright 2017 The Apache Software Foundation. - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka; * #%L * SAMOA * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -34,22 +30,16 @@ package org.apache.samoa.streams.kafka; * limitations under the License. * #L% */ -import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import java.util.logging.Logger; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import java.util.logging.Level; -import java.util.logging.Logger; /** * Internal class responsible for Kafka Stream handling (both consume and http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java index 2d59456..930ab23 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java @@ -1,6 +1,4 @@ /* - * Copyright 2017 The Apache Software Foundation. - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka; * #%L * SAMOA * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -35,7 +31,6 @@ package org.apache.samoa.streams.kafka; * #L% */ import java.io.IOException; -import java.nio.charset.Charset; import java.nio.file.Files; import java.util.Arrays; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java index b8b5c72..55c3b85 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java @@ -79,15 +79,14 @@ public class KafkaEntranceProcessorTest { private static final String ZKHOST = "127.0.0.1"; private static final String BROKERHOST = "127.0.0.1"; private static final String BROKERPORT = "9092"; - private static final String TOPIC_AVRO = "samoa_test-avro"; - private static final String TOPIC_JSON = "samoa_test-json"; + private static final String TOPIC_OOS = "samoa_test-oos"; private static final int NUM_INSTANCES = 11111; private static KafkaServer kafkaServer; private static EmbeddedZookeeper zkServer; private static ZkClient zkClient; private static String zkConnect; - private static int TIMEOUT = 1000; + private static final int TIMEOUT = 1000; public KafkaEntranceProcessorTest() { } @@ -110,9 +109,8 @@ public class KafkaEntranceProcessorTest { Time mock = new MockTime(); kafkaServer = TestUtils.createServer(config, mock); - // create topics - AdminUtils.createTopic(zkUtils, TOPIC_AVRO, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - AdminUtils.createTopic(zkUtils, TOPIC_JSON, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + // create topics + AdminUtils.createTopic(zkUtils, TOPIC_OOS, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); } @@ -145,7 +143,7 @@ public class KafkaEntranceProcessorTest { logger.log(Level.INFO, "testFetchingNewData"); Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT); props.setProperty("auto.offset.reset", "earliest"); - KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_JSON, TIMEOUT, new OosTestSerializer()); + KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_OOS, TIMEOUT, new OosTestSerializer()); kep.onCreate(1); @@ -163,7 +161,7 @@ public class KafkaEntranceProcessorTest { try { InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header); - ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_JSON, serializer.serialize(event)); + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_OOS, serializer.serialize(event)); long stat = producer.send(record).get(10, TimeUnit.SECONDS).offset(); } catch (InterruptedException | ExecutionException | TimeoutException ex) { Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, null, ex); http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java index 4215b08..adecac1 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java @@ -1,6 +1,4 @@ /* - * Copyright 2017 The Apache Software Foundation. - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -45,8 +43,6 @@ import org.apache.samoa.streams.kafka.topology.SimpleEngine; * #%L * SAMOA * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java index 8f77504..5dc4542 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java @@ -1,6 +1,4 @@ /* - * Copyright 2017 The Apache Software Foundation. - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka; * #%L * SAMOA * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java index 649d3e0..2b64bec 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java @@ -1,6 +1,4 @@ /* - * Copyright 2017 The Apache Software Foundation. - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java index 87ab16c..8936759 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java @@ -1,6 +1,4 @@ /* - * Copyright 2017 The Apache Software Foundation. - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka; * #%L * SAMOA * %% - * Copyright (C) 2014 - 2017 Apache Software Foundation - * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -52,9 +48,6 @@ import org.apache.samoa.moa.core.FastVector; */ public class TestUtilsForKafka { -// private static final String BROKERHOST = "127.0.0.1"; -// private static final String BROKERPORT = "9092"; - protected static InstanceContentEvent getData(Random instanceRandom, int numAtts, InstancesHeader header) { double[] attVals = new double[numAtts + 1]; double sum = 0.0;
