Minor changes, code cleanup etc.

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/eeb06916
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/eeb06916
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/eeb06916

Branch: refs/heads/master
Commit: eeb0691654ead552d10686694974a172ec0a8e92
Parents: 8fbfde7
Author: pwawrzyniak <[email protected]>
Authored: Mon Jun 19 15:08:05 2017 +0200
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../streams/kafka/KafkaConsumerThread.java      |   4 -
 .../samoa/streams/kafka/KafkaDeserializer.java  |  44 ++-
 .../kafka/KafkaDestinationProcessor.java        |   4 -
 .../streams/kafka/KafkaEntranceProcessor.java   |   4 -
 .../samoa/streams/kafka/KafkaSerializer.java    |  44 ++-
 .../apache/samoa/streams/kafka/KafkaTask.java   | 292 +++++++++----------
 .../apache/samoa/streams/kafka/KafkaUtils.java  |  12 +-
 .../kafka/KafkaDestinationProcessorTest.java    |   5 -
 .../kafka/KafkaEntranceProcessorTest.java       |  14 +-
 .../samoa/streams/kafka/KafkaTaskTest.java      |   4 -
 .../samoa/streams/kafka/KafkaUtilsTest.java     |   4 -
 .../samoa/streams/kafka/OosTestSerializer.java  |   2 -
 .../samoa/streams/kafka/TestUtilsForKafka.java  |   7 -
 13 files changed, 191 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
index a93986e..fbd3ec6 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
@@ -1,6 +1,4 @@
 /*
- * Copyright 2017 The Apache Software Foundation.
- *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka;
  * #%L
  * SAMOA
  * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
index 7b11cbd..459c491 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
@@ -1,6 +1,4 @@
 /*
- * Copyright 2017 The Apache Software Foundation.
- *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -13,28 +11,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.samoa.streams.kafka;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
 
 import org.apache.samoa.core.ContentEvent;
 

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
index 420d43c..231e25d 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
@@ -1,6 +1,4 @@
 /*
- * Copyright 2017 The Apache Software Foundation.
- *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka;
  * #%L
  * SAMOA
  * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
index 7079c58..ea5d06e 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
@@ -1,6 +1,4 @@
 /*
- * Copyright 2017 The Apache Software Foundation.
- *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka;
  * #%L
  * SAMOA
  * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
index ad6bd8e..2bbc259 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
@@ -1,6 +1,4 @@
 /*
- * Copyright 2017 The Apache Software Foundation.
- *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -13,28 +11,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.samoa.streams.kafka;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
 
 import org.apache.samoa.core.ContentEvent;
 

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
index 26012f2..0c8f138 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
@@ -1,148 +1,144 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Properties;
-
-import org.apache.samoa.tasks.Task;
-import org.apache.samoa.topology.ComponentFactory;
-import org.apache.samoa.topology.Stream;
-import org.apache.samoa.topology.Topology;
-import org.apache.samoa.topology.TopologyBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.javacliparser.Configurable;
-import com.github.javacliparser.IntOption;
-import com.github.javacliparser.StringOption;
-
-/**
- * Kafka task
- * 
- * @author Jakub Jankowski
- * @version 0.5.0-incubating-SNAPSHOT
- * @since 0.5.0-incubating
- *
- */
-
-public class KafkaTask implements Task, Configurable {
-
-       private static final long serialVersionUID = 3984474041982397855L;
-       private static Logger logger = LoggerFactory.getLogger(KafkaTask.class);
-       
-       //czy identyczne dla enterance i destination?
-       Properties producerProps;
-       Properties consumerProps;
-       int timeout;
-       private final KafkaDeserializer deserializer;
-       private final KafkaSerializer serializer;
-       private final String topic;
-
-       private TopologyBuilder builder;
-       private Topology kafkaTopology;
-
-       public IntOption kafkaParallelismOption = new 
IntOption("parallelismOption", 'p',
-                       "Number of destination Processors", 1, 1, 
Integer.MAX_VALUE);
-
-       public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation",
-                       "KafkaTask" + new 
SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
-
-       /**
-     * Class constructor
-     * @param props Properties of Kafka Producer and Consumer
-     * @see <a 
href="http://kafka.apache.org/documentation/#producerconfigs";>Kafka Producer 
configuration</a>
-     * @see <a 
href="http://kafka.apache.org/documentation/#consumerconfigs";>Kafka Consumer 
configuration</a>
-     * @param topic Topic to which destination processor will write into
-     * @param timeout Timeout used when polling Kafka for new messages
-     * @param serializer Implementation of KafkaSerializer that handles 
arriving data serialization
-     * @param serializer Implementation of KafkaDeserializer that handles 
arriving data deserialization
-     */
-       public KafkaTask(Properties producerProps, Properties consumerProps, 
String topic, int timeout, KafkaSerializer serializer, KafkaDeserializer 
deserializer) {
-               this.producerProps = producerProps;
-               this.consumerProps = consumerProps;
-               this.deserializer = deserializer;
-               this.serializer = serializer;
-               this.topic = topic;
-               this.timeout = timeout;
-       }
-
-       @Override
-       public void init() {
-               logger.info("Invoking init");
-               if (builder == null) {
-                       builder = new TopologyBuilder();
-                       logger.info("Successfully instantiating 
TopologyBuilder");
-
-                       builder.initTopology(evaluationNameOption.getValue());
-                       logger.info("Successfully initializing SAMOA topology 
with name {}", evaluationNameOption.getValue());
-               }
-               
-               // create enterance processor
-               KafkaEntranceProcessor sourceProcessor = new 
KafkaEntranceProcessor(consumerProps, topic, timeout, deserializer);
-               builder.addEntranceProcessor(sourceProcessor);
-               
-               // create stream
-               Stream stream = builder.createStream(sourceProcessor);
-               
-               // create destination processor
-               KafkaDestinationProcessor destProcessor = new 
KafkaDestinationProcessor(producerProps, topic, serializer);
-               builder.addProcessor(destProcessor, 
kafkaParallelismOption.getValue());
-               builder.connectInputShuffleStream(stream, destProcessor);
-               
-               // build topology
-               kafkaTopology = builder.build();
-           logger.info("Successfully built the topology");
-       }
-
-       @Override
-       public Topology getTopology() {
-               return kafkaTopology;
-       }
-
-       @Override
-       public void setFactory(ComponentFactory factory) {
-               logger.info("Invoking setFactory: "+factory.toString());
-               builder = new TopologyBuilder(factory);
-           logger.info("Successfully instantiating TopologyBuilder");
-
-           builder.initTopology(evaluationNameOption.getValue());
-           logger.info("Successfully initializing SAMOA topology with name 
{}", evaluationNameOption.getValue());
-
-       }
-
-}
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+import org.apache.samoa.tasks.Task;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.apache.samoa.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+
+/**
+ * Kafka task
+ * 
+ * @author Jakub Jankowski
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ *
+ */
+
+public class KafkaTask implements Task, Configurable {
+
+       private static final long serialVersionUID = 3984474041982397855L;
+       private static Logger logger = LoggerFactory.getLogger(KafkaTask.class);
+       
+       //czy identyczne dla enterance i destination?
+       Properties producerProps;
+       Properties consumerProps;
+       int timeout;
+       private final KafkaDeserializer deserializer;
+       private final KafkaSerializer serializer;
+       private final String topic;
+
+       private TopologyBuilder builder;
+       private Topology kafkaTopology;
+
+       public IntOption kafkaParallelismOption = new 
IntOption("parallelismOption", 'p',
+                       "Number of destination Processors", 1, 1, 
Integer.MAX_VALUE);
+
+       public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation",
+                       "KafkaTask" + new 
SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
+
+       /**
+     * Class constructor
+     * @param props Properties of Kafka Producer and Consumer
+     * @see <a 
href="http://kafka.apache.org/documentation/#producerconfigs";>Kafka Producer 
configuration</a>
+     * @see <a 
href="http://kafka.apache.org/documentation/#consumerconfigs";>Kafka Consumer 
configuration</a>
+     * @param topic Topic to which destination processor will write into
+     * @param timeout Timeout used when polling Kafka for new messages
+     * @param serializer Implementation of KafkaSerializer that handles 
arriving data serialization
+     * @param serializer Implementation of KafkaDeserializer that handles 
arriving data deserialization
+     */
+       public KafkaTask(Properties producerProps, Properties consumerProps, 
String topic, int timeout, KafkaSerializer serializer, KafkaDeserializer 
deserializer) {
+               this.producerProps = producerProps;
+               this.consumerProps = consumerProps;
+               this.deserializer = deserializer;
+               this.serializer = serializer;
+               this.topic = topic;
+               this.timeout = timeout;
+       }
+
+       @Override
+       public void init() {
+               logger.info("Invoking init");
+               if (builder == null) {
+                       builder = new TopologyBuilder();
+                       logger.info("Successfully instantiating 
TopologyBuilder");
+
+                       builder.initTopology(evaluationNameOption.getValue());
+                       logger.info("Successfully initializing SAMOA topology 
with name {}", evaluationNameOption.getValue());
+               }
+               
+               // create enterance processor
+               KafkaEntranceProcessor sourceProcessor = new 
KafkaEntranceProcessor(consumerProps, topic, timeout, deserializer);
+               builder.addEntranceProcessor(sourceProcessor);
+               
+               // create stream
+               Stream stream = builder.createStream(sourceProcessor);
+               
+               // create destination processor
+               KafkaDestinationProcessor destProcessor = new 
KafkaDestinationProcessor(producerProps, topic, serializer);
+               builder.addProcessor(destProcessor, 
kafkaParallelismOption.getValue());
+               builder.connectInputShuffleStream(stream, destProcessor);
+               
+               // build topology
+               kafkaTopology = builder.build();
+           logger.info("Successfully built the topology");
+       }
+
+       @Override
+       public Topology getTopology() {
+               return kafkaTopology;
+       }
+
+       @Override
+       public void setFactory(ComponentFactory factory) {
+               logger.info("Invoking setFactory: "+factory.toString());
+               builder = new TopologyBuilder(factory);
+           logger.info("Successfully instantiating TopologyBuilder");
+
+           builder.initTopology(evaluationNameOption.getValue());
+           logger.info("Successfully initializing SAMOA topology with name 
{}", evaluationNameOption.getValue());
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
index 75b5402..fb3aef7 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
@@ -1,6 +1,4 @@
 /*
- * Copyright 2017 The Apache Software Foundation.
- *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka;
  * #%L
  * SAMOA
  * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -34,22 +30,16 @@ package org.apache.samoa.streams.kafka;
  * limitations under the License.
  * #L%
  */
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import java.util.logging.Logger;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 /**
  * Internal class responsible for Kafka Stream handling (both consume and

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
index 2d59456..930ab23 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
@@ -1,6 +1,4 @@
 /*
- * Copyright 2017 The Apache Software Foundation.
- *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka;
  * #%L
  * SAMOA
  * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -35,7 +31,6 @@ package org.apache.samoa.streams.kafka;
  * #L%
  */
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Iterator;

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
index b8b5c72..55c3b85 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
@@ -79,15 +79,14 @@ public class KafkaEntranceProcessorTest {
     private static final String ZKHOST = "127.0.0.1";
     private static final String BROKERHOST = "127.0.0.1";
     private static final String BROKERPORT = "9092";
-    private static final String TOPIC_AVRO = "samoa_test-avro";
-    private static final String TOPIC_JSON = "samoa_test-json";
+    private static final String TOPIC_OOS = "samoa_test-oos";
     private static final int NUM_INSTANCES = 11111;
 
     private static KafkaServer kafkaServer;
     private static EmbeddedZookeeper zkServer;
     private static ZkClient zkClient;
     private static String zkConnect;
-    private static int TIMEOUT = 1000;
+    private static final int TIMEOUT = 1000;
 
     public KafkaEntranceProcessorTest() {
     }
@@ -110,9 +109,8 @@ public class KafkaEntranceProcessorTest {
         Time mock = new MockTime();
         kafkaServer = TestUtils.createServer(config, mock);
 
-        // create topics
-        AdminUtils.createTopic(zkUtils, TOPIC_AVRO, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
-        AdminUtils.createTopic(zkUtils, TOPIC_JSON, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+        // create topics        
+        AdminUtils.createTopic(zkUtils, TOPIC_OOS, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
 
     }
 
@@ -145,7 +143,7 @@ public class KafkaEntranceProcessorTest {
         logger.log(Level.INFO, "testFetchingNewData");
         Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, 
BROKERPORT);
         props.setProperty("auto.offset.reset", "earliest");
-        KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, 
TOPIC_JSON, TIMEOUT, new OosTestSerializer());
+        KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, 
TOPIC_OOS, TIMEOUT, new OosTestSerializer());
 
         kep.onCreate(1);
 
@@ -163,7 +161,7 @@ public class KafkaEntranceProcessorTest {
                     try {
                         InstanceContentEvent event = 
TestUtilsForKafka.getData(r, 10, header);
 
-                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_JSON, serializer.serialize(event));
+                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_OOS, serializer.serialize(event));
                         long stat = producer.send(record).get(10, 
TimeUnit.SECONDS).offset();
                     } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
                         
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
index 4215b08..adecac1 100644
--- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
@@ -1,6 +1,4 @@
 /*
- * Copyright 2017 The Apache Software Foundation.
- *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -45,8 +43,6 @@ import org.apache.samoa.streams.kafka.topology.SimpleEngine;
  * #%L
  * SAMOA
  * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
index 8f77504..5dc4542 100644
--- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
@@ -1,6 +1,4 @@
 /*
- * Copyright 2017 The Apache Software Foundation.
- *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka;
  * #%L
  * SAMOA
  * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
index 649d3e0..2b64bec 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
@@ -1,6 +1,4 @@
 /*
- * Copyright 2017 The Apache Software Foundation.
- *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/eeb06916/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
index 87ab16c..8936759 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
@@ -1,6 +1,4 @@
 /*
- * Copyright 2017 The Apache Software Foundation.
- *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -19,8 +17,6 @@ package org.apache.samoa.streams.kafka;
  * #%L
  * SAMOA
  * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -52,9 +48,6 @@ import org.apache.samoa.moa.core.FastVector;
  */
 public class TestUtilsForKafka {
 
-//    private static final String BROKERHOST = "127.0.0.1";
-//    private static final String BROKERPORT = "9092";                         
-
     protected static InstanceContentEvent getData(Random instanceRandom, int 
numAtts, InstancesHeader header) {
         double[] attVals = new double[numAtts + 1];
         double sum = 0.0;

Reply via email to