Repository: apex-malhar
Updated Branches:
  refs/heads/master a4551b42f -> b8ca9d63f


Fixed Checkstyle and Log 4j Properties.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b8ca9d63
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b8ca9d63
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b8ca9d63

Branch: refs/heads/master
Commit: b8ca9d63f9ab5996358cb8c874bf6b3ae463d33e
Parents: c79def4
Author: Apex Dev <[email protected]>
Authored: Wed Mar 29 14:09:17 2017 -0700
Committer: Lakshmi Prasanna Velineni <[email protected]>
Committed: Mon Apr 24 09:06:40 2017 -0700

----------------------------------------------------------------------
 examples/kafka/pom.xml                          | 20 +++++
 examples/kafka/src/assemble/appPackage.xml      | 20 +++++
 .../examples/kafka/hdfs2kafka/Application.java  | 29 ++++++--
 .../examples/kafka/kafka2hdfs/KafkaApp.java     | 23 +++++-
 .../kafka/kafka2hdfs/LineOutputOperator.java    | 37 ++++++++--
 .../META-INF/properties-hdfs2kafka.xml          | 20 +++++
 .../src/main/resources/META-INF/properties.xml  | 20 +++++
 .../kafka/hdfs2kafka/ApplicationTest.java       | 53 +++++++++----
 .../kafka/kafka2hdfs/ApplicationTest.java       | 78 ++++++++++++--------
 .../kafka/src/test/resources/log4j.properties   | 21 ++++++
 10 files changed, 262 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b8ca9d63/examples/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/examples/kafka/pom.xml b/examples/kafka/pom.xml
index c21bb20..497a39e 100644
--- a/examples/kafka/pom.xml
+++ b/examples/kafka/pom.xml
@@ -1,4 +1,24 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b8ca9d63/examples/kafka/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/kafka/src/assemble/appPackage.xml 
b/examples/kafka/src/assemble/appPackage.xml
index 7ad071c..a870807 100644
--- a/examples/kafka/src/assemble/appPackage.xml
+++ b/examples/kafka/src/assemble/appPackage.xml
@@ -1,3 +1,23 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
 <assembly 
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
     
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b8ca9d63/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java
----------------------------------------------------------------------
diff --git 
a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java
 
b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java
index 646c8e8..4b7d029 100644
--- 
a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java
+++ 
b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/hdfs2kafka/Application.java
@@ -1,23 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.apex.examples.kafka.hdfs2kafka;
 
+import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
-import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
 
-@ApplicationAnnotation(name="Hdfs2Kafka")
+@ApplicationAnnotation(name = "Hdfs2Kafka")
 public class Application implements StreamingApplication
 {
 
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-    LineByLineFileInputOperator in = dag.addOperator("lines",
-                                                     
LineByLineFileInputOperator.class);
+    LineByLineFileInputOperator in = dag.addOperator("lines", 
LineByLineFileInputOperator.class);
 
     KafkaSinglePortOutputOperator<String,String> out = 
dag.addOperator("kafkaOutput", new 
KafkaSinglePortOutputOperator<String,String>());
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b8ca9d63/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java
----------------------------------------------------------------------
diff --git 
a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java
 
b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java
index 15f0182..6e0c5eb 100644
--- 
a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java
+++ 
b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/KafkaApp.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.apex.examples.kafka.kafka2hdfs;
 
 import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
@@ -8,15 +26,14 @@ import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 
-@ApplicationAnnotation(name="Kafka2HDFS")
+@ApplicationAnnotation(name = "Kafka2HDFS")
 public class KafkaApp implements StreamingApplication
 {
 
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-    KafkaSinglePortInputOperator in
-      = dag.addOperator("kafkaIn", new KafkaSinglePortInputOperator());
+    KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new 
KafkaSinglePortInputOperator());
 
     
in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
     LineOutputOperator out = dag.addOperator("fileOut", new 
LineOutputOperator());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b8ca9d63/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java
 
b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java
index ef40a69..dec0f7e 100644
--- 
a/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java
+++ 
b/examples/kafka/src/main/java/org/apache/apex/examples/kafka/kafka2hdfs/LineOutputOperator.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.apex.examples.kafka.kafka2hdfs;
 
 import java.nio.charset.Charset;
@@ -19,16 +37,25 @@ public class LineOutputOperator extends 
AbstractFileOutputOperator<byte[]>
   private String baseName;
 
   @Override
-  public byte[] getBytesForTuple(byte[] t) {
+  public byte[] getBytesForTuple(byte[] t)
+  {
     String result = new String(t, CS) + NL;
     return result.getBytes(CS);
- }
+  }
 
   @Override
-  protected String getFileName(byte[] tuple) {
+  protected String getFileName(byte[] tuple)
+  {
     return baseName;
   }
 
-  public String getBaseName() { return baseName; }
-  public void setBaseName(String v) { baseName = v; }
+  public String getBaseName()
+  {
+    return baseName;
+  }
+
+  public void setBaseName(String v)
+  {
+    baseName = v;
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b8ca9d63/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml
----------------------------------------------------------------------
diff --git 
a/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml 
b/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml
index 7c624ca..edae72d 100644
--- a/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml
+++ b/examples/kafka/src/main/resources/META-INF/properties-hdfs2kafka.xml
@@ -1,4 +1,24 @@
 <?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
 <configuration>
   <property>
     <name>dt.operator.kafkaOutput.prop.topic</name>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b8ca9d63/examples/kafka/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/kafka/src/main/resources/META-INF/properties.xml 
b/examples/kafka/src/main/resources/META-INF/properties.xml
index a896168..69f8bfa 100644
--- a/examples/kafka/src/main/resources/META-INF/properties.xml
+++ b/examples/kafka/src/main/resources/META-INF/properties.xml
@@ -1,4 +1,24 @@
 <?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
 <configuration>
   <!-- 
   <property>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b8ca9d63/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
 
b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
index aa63ee5..d00236b 100644
--- 
a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
+++ 
b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.apex.examples.kafka.hdfs2kafka;
 
 import java.io.File;
@@ -26,20 +44,20 @@ import static org.junit.Assert.assertTrue;
 /**
  * Test the DAG declaration in local mode.
  */
-public class ApplicationTest {
+public class ApplicationTest
+{
   private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationTest.class);
   private static final String TOPIC = "hdfs2kafka";
   private static final String directory = "target/hdfs2kafka";
   private static final String FILE_NAME = "messages.txt";
 
   private static final int zkPort = 2181;
-  private static final int  brokerPort = 9092;
+  private static final int brokerPort = 9092;
   private static final String BROKER = "localhost:" + brokerPort;
   //private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0"; 
    // first part
 
   // test messages
-  private static String[] lines =
-  {
+  private static String[] lines = {
     "1st line",
     "2nd line",
     "3rd line",
@@ -51,9 +69,9 @@ public class ApplicationTest {
   @Rule
   public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
 
-
   @Test
-  public void testApplication() throws IOException, Exception {
+  public void testApplication() throws IOException, Exception
+  {
     try {
       // create file in monitored HDFS directory
       createFile();
@@ -71,7 +89,8 @@ public class ApplicationTest {
   }
 
   // create a file with content from 'lines'
-  private void createFile() throws IOException {
+  private void createFile() throws IOException
+  {
     // remove old file and create new one
     File file = new File(directory, FILE_NAME);
     FileUtils.deleteQuietly(file);
@@ -82,11 +101,11 @@ public class ApplicationTest {
       LOG.error("Error: Failed to create file {} in {}", FILE_NAME, directory);
       e.printStackTrace();
     }
-    LOG.debug("Created file {} with {} lines in {}",
-              FILE_NAME, lines.length, directory);
+    LOG.debug("Created file {} with {} lines in {}", FILE_NAME, lines.length, 
directory);
   }
 
-  private LocalMode.Controller asyncRun() throws Exception {
+  private LocalMode.Controller asyncRun() throws Exception
+  {
     Configuration conf = getConfig();
     LocalMode lma = LocalMode.newInstance();
     lma.prepareDAG(new Application(), conf);
@@ -95,14 +114,16 @@ public class ApplicationTest {
     return lc;
   }
 
-  private Configuration getConfig() {
-      Configuration conf = new Configuration(false);
-      
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-hdfs2kafka.xml"));
-      conf.set("dt.operator.lines.prop.directory", directory);
-      return conf;
+  private Configuration getConfig()
+  {
+    Configuration conf = new Configuration(false);
+    
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-hdfs2kafka.xml"));
+    conf.set("dt.operator.lines.prop.directory", directory);
+    return conf;
   }
 
-  private void chkOutput() throws Exception {
+  private void chkOutput() throws Exception
+  {
     KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
     List<String> messages = null;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b8ca9d63/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
 
b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
index 80d84fa..be38c53 100644
--- 
a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
+++ 
b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
@@ -1,5 +1,20 @@
 /**
- * Put your copyright and license info here.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.apex.examples.kafka.kafka2hdfs;
 
@@ -30,20 +45,20 @@ import static org.junit.Assert.assertTrue;
 /**
  * Test the DAG declaration in local mode.
  */
-public class ApplicationTest {
+public class ApplicationTest
+{
   private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationTest.class);
   private static final String TOPIC = "kafka2hdfs";
 
   private static final int zkPort = 2181;
-  private static final int  brokerPort = 9092;
+  private static final int brokerPort = 9092;
   private static final String BROKER = "localhost:" + brokerPort;
   private static final String FILE_NAME = "test";
-  private static final String FILE_DIR  = "/tmp/FromKafka";
+  private static final String FILE_DIR = "/tmp/FromKafka";
   private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0";   
  // first part
 
   // test messages
-  private static String[] lines =
-  {
+  private static String[] lines = {
     "1st line",
     "2nd line",
     "3rd line",
@@ -56,7 +71,8 @@ public class ApplicationTest {
   public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
 
   @Test
-  public void testApplication() throws Exception {
+  public void testApplication() throws Exception
+  {
     try {
       // delete output file if it exists
       File file = new File(FILE_PATH);
@@ -80,7 +96,8 @@ public class ApplicationTest {
     }
   }
 
-  private void writeToTopic() {
+  private void writeToTopic()
+  {
     KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
     ku.createTopic(TOPIC);
     for (String line : lines) {
@@ -90,25 +107,26 @@ public class ApplicationTest {
     LOG.debug("Sent messages to topic {}", TOPIC);
   }
 
-  private Configuration getConfig() {
-      Configuration conf = new Configuration(false);
-      String pre = "dt.operator.kafkaIn.prop.";
-      conf.setEnum(pre + "initialOffset",
-                   AbstractKafkaInputOperator.InitialOffset.EARLIEST);
-      conf.setInt(pre + "initialPartitionCount", 1);
-      conf.set(   pre + "topics",                TOPIC);
-      conf.set(   pre + "clusters",              BROKER);
-
-      pre = "dt.operator.fileOut.prop.";
-      conf.set(   pre + "filePath",        FILE_DIR);
-      conf.set(   pre + "baseName",        FILE_NAME);
-      conf.setInt(pre + "maxLength",       40);
-      conf.setInt(pre + "rotationWindows", 3);
-
-      return conf;
+  private Configuration getConfig()
+  {
+    Configuration conf = new Configuration(false);
+    String pre = "dt.operator.kafkaIn.prop.";
+    conf.setEnum(pre + "initialOffset", 
AbstractKafkaInputOperator.InitialOffset.EARLIEST);
+    conf.setInt(pre + "initialPartitionCount", 1);
+    conf.set(pre + "topics", TOPIC);
+    conf.set(pre + "clusters", BROKER);
+
+    pre = "dt.operator.fileOut.prop.";
+    conf.set(pre + "filePath", FILE_DIR);
+    conf.set(pre + "baseName", FILE_NAME);
+    conf.setInt(pre + "maxLength", 40);
+    conf.setInt(pre + "rotationWindows", 3);
+
+    return conf;
   }
 
-  private LocalMode.Controller asyncRun() throws Exception {
+  private LocalMode.Controller asyncRun() throws Exception
+  {
     Configuration conf = getConfig();
     LocalMode lma = LocalMode.newInstance();
     lma.prepareDAG(new KafkaApp(), conf);
@@ -117,20 +135,22 @@ public class ApplicationTest {
     return lc;
   }
 
-  private static void chkOutput() throws Exception {
+  private static void chkOutput() throws Exception
+  {
     File file = new File(FILE_PATH);
     final int MAX = 60;
-    for (int i = 0; i < MAX && (! file.exists()); ++i ) {
+    for (int i = 0; i < MAX && (!file.exists()); ++i) {
       LOG.debug("Sleeping, i = {}", i);
       Thread.sleep(1000);
     }
-    if (! file.exists()) {
+    if (!file.exists()) {
       String msg = String.format("Error: %s not found after %d seconds%n", 
FILE_PATH, MAX);
       throw new RuntimeException(msg);
     }
   }
 
-  private static void compare() throws Exception {
+  private static void compare() throws Exception
+  {
     // read output file
     File file = new File(FILE_PATH);
     BufferedReader br = new BufferedReader(new FileReader(file));

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b8ca9d63/examples/kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/kafka/src/test/resources/log4j.properties 
b/examples/kafka/src/test/resources/log4j.properties
index 3bfcdc5..0a1b8cb 100644
--- a/examples/kafka/src/test/resources/log4j.properties
+++ b/examples/kafka/src/test/resources/log4j.properties
@@ -1,8 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
 log4j.rootLogger=DEBUG,CONSOLE
 
 log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
 log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
 log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M 
- %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=WARN
 
 log4j.appender.RFA=org.apache.log4j.RollingFileAppender
 log4j.appender.RFA.layout=org.apache.log4j.PatternLayout

Reply via email to