http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java
----------------------------------------------------------------------
diff --git 
a/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java
 
b/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java
deleted file mode 100644
index a769016..0000000
--- 
a/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.example.transform;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StatsListener;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
-import com.datatorrent.lib.transform.TransformOperator;
-
-@ApplicationAnnotation(name="DynamicTransformApp")
-public class DynamicTransformApplication implements StreamingApplication
-{
-  private static String COOL_DOWN_MILLIS = "dt.cooldown";
-  private static String MAX_THROUGHPUT = "dt.maxThroughput";
-  private static String MIN_THROUGHPUT = "dt.minThroughput";
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    POJOGenerator input = dag.addOperator("Input", new POJOGenerator());
-    TransformOperator transform = dag.addOperator("Process", new 
TransformOperator());
-    // Set expression map
-    Map<String, String> expMap = new HashMap<>();
-    expMap.put("name", "{$.firstName}.concat(\" \").concat({$.lastName})");
-    expMap.put("age", "(new java.util.Date()).getYear() - 
{$.dateOfBirth}.getYear()");
-    expMap.put("address", "{$.address}.toLowerCase()");
-    transform.setExpressionMap(expMap);
-    ConsoleOutputOperator output = dag.addOperator("Output", new 
ConsoleOutputOperator());
-
-    dag.addStream("InputToTransform", input.output, transform.input);
-    dag.addStream("TransformToOutput", transform.output, output.input);
-
-    dag.setInputPortAttribute(transform.input, 
Context.PortContext.TUPLE_CLASS, CustomerEvent.class);
-    dag.setOutputPortAttribute(transform.output, 
Context.PortContext.TUPLE_CLASS, CustomerInfo.class);
-
-    StatelessThroughputBasedPartitioner<TransformOperator> partitioner = new 
StatelessThroughputBasedPartitioner<>();
-    partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));
-    partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
-    partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
-    dag.setAttribute(transform, Context.OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{partitioner}));
-    dag.setAttribute(transform, Context.OperatorContext.PARTITIONER, 
partitioner);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/com/example/transform/POJOGenerator.java
----------------------------------------------------------------------
diff --git 
a/examples/transform/src/main/java/com/example/transform/POJOGenerator.java 
b/examples/transform/src/main/java/com/example/transform/POJOGenerator.java
deleted file mode 100644
index 9db5fd1..0000000
--- a/examples/transform/src/main/java/com/example/transform/POJOGenerator.java
+++ /dev/null
@@ -1,125 +0,0 @@
-package com.example.transform;
-
-import java.util.Date;
-import java.util.Random;
-
-import javax.validation.constraints.Min;
-
-import org.apache.commons.lang3.RandomStringUtils;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-
-/**
- * Generates and emits the CustomerEvent
- */
-public class POJOGenerator implements InputOperator
-{
-  @Min(1)
-  private int maxCustomerId = 100000;
-  @Min(1)
-  private int maxNameLength = 10;
-  @Min(1)
-  private int maxAddressLength = 15;
-  private long tuplesCounter;
-  // Limit number of emitted tuples per window
-  @Min(1)
-  private long maxTuplesPerWindow = 100;
-  private final Random random = new Random();
-  private final RandomStringUtils rRandom = new RandomStringUtils();
-  public final transient DefaultOutputPort<Object> output = new 
DefaultOutputPort<>();
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    tuplesCounter = 0;
-  }
-
-  @Override
-  public void endWindow()
-  {
-  }
-
-  @Override
-  public void setup(Context.OperatorContext context)
-  {
-  }
-
-  @Override
-  public void teardown()
-  {
-
-  }
-
-  CustomerEvent generateCustomersEvent() throws Exception {
-
-    CustomerEvent customerEvent = new CustomerEvent();
-    customerEvent.setCustomerId(randomId(maxCustomerId));
-    
customerEvent.setFirstName(rRandom.randomAlphabetic(randomId(maxNameLength)));
-    
customerEvent.setLastName(rRandom.randomAlphabetic(randomId(maxNameLength)));
-    long val = random.nextLong();
-    long diff1 = val % System.currentTimeMillis();
-    customerEvent.setDateOfBirth(new Date(diff1));
-    
customerEvent.setAddress(rRandom.randomAlphabetic(randomId(maxAddressLength)));
-    return customerEvent;
-  }
-
-  private int randomId(int max) {
-    if (max < 1) return 1;
-    return 1 + random.nextInt(max);
-  }
-
-  @Override
-  public void emitTuples()
-  {
-    while (tuplesCounter++ < maxTuplesPerWindow) {
-      try {
-        CustomerEvent event = generateCustomersEvent();
-        this.output.emit(event);
-      } catch (Exception ex) {
-        throw new RuntimeException(ex);
-      }
-    }
-  }
-
-  public long getMaxTuplesPerWindow()
-  {
-    return maxTuplesPerWindow;
-  }
-
-  public void setMaxTuplesPerWindow(long maxTuplesPerWindow)
-  {
-    this.maxTuplesPerWindow = maxTuplesPerWindow;
-  }
-
-  public int getMaxAddressLength()
-  {
-    return maxAddressLength;
-  }
-
-  public void setMaxAddressLength(int maxAddressLength)
-  {
-    this.maxAddressLength = maxAddressLength;
-  }
-
-  public int getMaxNameLength()
-  {
-    return maxNameLength;
-  }
-
-  public void setMaxNameLength(int maxNameLength)
-  {
-    this.maxNameLength = maxNameLength;
-  }
-
-  public int getMaxCustomerId()
-  {
-    return maxCustomerId;
-  }
-
-  public void setMaxCustomerId(int maxCustomerId)
-  {
-    this.maxCustomerId = maxCustomerId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/org/apache/apex/examples/transform/Application.java
----------------------------------------------------------------------
diff --git 
a/examples/transform/src/main/java/org/apache/apex/examples/transform/Application.java
 
b/examples/transform/src/main/java/org/apache/apex/examples/transform/Application.java
new file mode 100644
index 0000000..e6a0419
--- /dev/null
+++ 
b/examples/transform/src/main/java/org/apache/apex/examples/transform/Application.java
@@ -0,0 +1,39 @@
+package org.apache.apex.examples.transform;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.transform.TransformOperator;
+
+@ApplicationAnnotation(name="TransformExample")
+public class Application implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    POJOGenerator input = dag.addOperator("Input", new POJOGenerator());
+    TransformOperator transform = dag.addOperator("Process", new 
TransformOperator());
+    // Set expression map
+    Map<String, String> expMap = new HashMap<>();
+    expMap.put("name", "{$.firstName}.concat(\" \").concat({$.lastName})");
+    expMap.put("age", "(new java.util.Date()).getYear() - 
{$.dateOfBirth}.getYear()");
+    expMap.put("address", "{$.address}.toLowerCase()");
+    transform.setExpressionMap(expMap);
+    ConsoleOutputOperator output = dag.addOperator("Output", new 
ConsoleOutputOperator());
+
+    dag.addStream("InputToTransform", input.output, transform.input);
+    dag.addStream("TransformToOutput", transform.output, output.input);
+
+    dag.setInputPortAttribute(transform.input, 
Context.PortContext.TUPLE_CLASS, CustomerEvent.class);
+    dag.setOutputPortAttribute(transform.output, 
Context.PortContext.TUPLE_CLASS, CustomerInfo.class);
+    dag.setAttribute(transform, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<TransformOperator>(2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerEvent.java
----------------------------------------------------------------------
diff --git 
a/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerEvent.java
 
b/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerEvent.java
new file mode 100644
index 0000000..8011e12
--- /dev/null
+++ 
b/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerEvent.java
@@ -0,0 +1,74 @@
+package org.apache.apex.examples.transform;
+
+import java.util.Date;
+
+public class CustomerEvent
+{
+  private int customerId;
+  private String firstName;
+  private String lastName;
+  private Date dateOfBirth;
+  private String address;
+
+  public int getCustomerId()
+  {
+    return customerId;
+  }
+
+  public void setCustomerId(int customerId)
+  {
+    this.customerId = customerId;
+  }
+
+  public String getFirstName()
+  {
+    return firstName;
+  }
+
+  public void setFirstName(String firstName)
+  {
+    this.firstName = firstName;
+  }
+
+  public String getLastName()
+  {
+    return lastName;
+  }
+
+  public void setLastName(String lastName)
+  {
+    this.lastName = lastName;
+  }
+
+  public Date getDateOfBirth()
+  {
+    return dateOfBirth;
+  }
+
+  public void setDateOfBirth(Date dateOfBirth)
+  {
+    this.dateOfBirth = dateOfBirth;
+  }
+
+  public String getAddress()
+  {
+    return address;
+  }
+
+  public void setAddress(String address)
+  {
+    this.address = address;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "CustomerEvent{" +
+      "customerId=" + customerId +
+      ", firstName='" + firstName + '\'' +
+      ", lastName='" + lastName + '\'' +
+      ", dateOfBirth=" + dateOfBirth +
+      ", address='" + address + '\'' +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerInfo.java
----------------------------------------------------------------------
diff --git 
a/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerInfo.java
 
b/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerInfo.java
new file mode 100644
index 0000000..ece76a6
--- /dev/null
+++ 
b/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerInfo.java
@@ -0,0 +1,60 @@
+package org.apache.apex.examples.transform;
+
+public class CustomerInfo
+{
+  private int customerId;
+  private String name;
+  private int age;
+  private String address;
+
+  public int getCustomerId()
+  {
+    return customerId;
+  }
+
+  public void setCustomerId(int customerId)
+  {
+    this.customerId = customerId;
+  }
+
+  public String getName()
+  {
+    return name;
+  }
+
+  public void setName(String name)
+  {
+    this.name = name;
+  }
+
+  public int getAge()
+  {
+    return age;
+  }
+
+  public void setAge(int age)
+  {
+    this.age = age;
+  }
+
+  public String getAddress()
+  {
+    return address;
+  }
+
+  public void setAddress(String address)
+  {
+    this.address = address;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "CustomerInfo{" +
+      "customerId=" + customerId +
+      ", name='" + name + '\'' +
+      ", age=" + age +
+      ", address='" + address + '\'' +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/org/apache/apex/examples/transform/DynamicTransformApplication.java
----------------------------------------------------------------------
diff --git 
a/examples/transform/src/main/java/org/apache/apex/examples/transform/DynamicTransformApplication.java
 
b/examples/transform/src/main/java/org/apache/apex/examples/transform/DynamicTransformApplication.java
new file mode 100644
index 0000000..01bc446
--- /dev/null
+++ 
b/examples/transform/src/main/java/org/apache/apex/examples/transform/DynamicTransformApplication.java
@@ -0,0 +1,51 @@
+package org.apache.apex.examples.transform;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
+import com.datatorrent.lib.transform.TransformOperator;
+
+@ApplicationAnnotation(name="DynamicTransformApp")
+public class DynamicTransformApplication implements StreamingApplication
+{
+  private static String COOL_DOWN_MILLIS = "dt.cooldown";
+  private static String MAX_THROUGHPUT = "dt.maxThroughput";
+  private static String MIN_THROUGHPUT = "dt.minThroughput";
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    POJOGenerator input = dag.addOperator("Input", new POJOGenerator());
+    TransformOperator transform = dag.addOperator("Process", new 
TransformOperator());
+    // Set expression map
+    Map<String, String> expMap = new HashMap<>();
+    expMap.put("name", "{$.firstName}.concat(\" \").concat({$.lastName})");
+    expMap.put("age", "(new java.util.Date()).getYear() - 
{$.dateOfBirth}.getYear()");
+    expMap.put("address", "{$.address}.toLowerCase()");
+    transform.setExpressionMap(expMap);
+    ConsoleOutputOperator output = dag.addOperator("Output", new 
ConsoleOutputOperator());
+
+    dag.addStream("InputToTransform", input.output, transform.input);
+    dag.addStream("TransformToOutput", transform.output, output.input);
+
+    dag.setInputPortAttribute(transform.input, 
Context.PortContext.TUPLE_CLASS, CustomerEvent.class);
+    dag.setOutputPortAttribute(transform.output, 
Context.PortContext.TUPLE_CLASS, CustomerInfo.class);
+
+    StatelessThroughputBasedPartitioner<TransformOperator> partitioner = new 
StatelessThroughputBasedPartitioner<>();
+    partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));
+    partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
+    partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
+    dag.setAttribute(transform, Context.OperatorContext.STATS_LISTENERS, 
Arrays.asList(new StatsListener[]{partitioner}));
+    dag.setAttribute(transform, Context.OperatorContext.PARTITIONER, 
partitioner);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/org/apache/apex/examples/transform/POJOGenerator.java
----------------------------------------------------------------------
diff --git 
a/examples/transform/src/main/java/org/apache/apex/examples/transform/POJOGenerator.java
 
b/examples/transform/src/main/java/org/apache/apex/examples/transform/POJOGenerator.java
new file mode 100644
index 0000000..f8f3b22
--- /dev/null
+++ 
b/examples/transform/src/main/java/org/apache/apex/examples/transform/POJOGenerator.java
@@ -0,0 +1,125 @@
+package org.apache.apex.examples.transform;
+
+import java.util.Date;
+import java.util.Random;
+
+import javax.validation.constraints.Min;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+
+/**
+ * Generates and emits the CustomerEvent
+ */
+public class POJOGenerator implements InputOperator
+{
+  @Min(1)
+  private int maxCustomerId = 100000;
+  @Min(1)
+  private int maxNameLength = 10;
+  @Min(1)
+  private int maxAddressLength = 15;
+  private long tuplesCounter;
+  // Limit number of emitted tuples per window
+  @Min(1)
+  private long maxTuplesPerWindow = 100;
+  private final Random random = new Random();
+  private final RandomStringUtils rRandom = new RandomStringUtils();
+  public final transient DefaultOutputPort<Object> output = new 
DefaultOutputPort<>();
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    tuplesCounter = 0;
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  CustomerEvent generateCustomersEvent() throws Exception {
+
+    CustomerEvent customerEvent = new CustomerEvent();
+    customerEvent.setCustomerId(randomId(maxCustomerId));
+    
customerEvent.setFirstName(rRandom.randomAlphabetic(randomId(maxNameLength)));
+    
customerEvent.setLastName(rRandom.randomAlphabetic(randomId(maxNameLength)));
+    long val = random.nextLong();
+    long diff1 = val % System.currentTimeMillis();
+    customerEvent.setDateOfBirth(new Date(diff1));
+    
customerEvent.setAddress(rRandom.randomAlphabetic(randomId(maxAddressLength)));
+    return customerEvent;
+  }
+
+  private int randomId(int max) {
+    if (max < 1) return 1;
+    return 1 + random.nextInt(max);
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    while (tuplesCounter++ < maxTuplesPerWindow) {
+      try {
+        CustomerEvent event = generateCustomersEvent();
+        this.output.emit(event);
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  public long getMaxTuplesPerWindow()
+  {
+    return maxTuplesPerWindow;
+  }
+
+  public void setMaxTuplesPerWindow(long maxTuplesPerWindow)
+  {
+    this.maxTuplesPerWindow = maxTuplesPerWindow;
+  }
+
+  public int getMaxAddressLength()
+  {
+    return maxAddressLength;
+  }
+
+  public void setMaxAddressLength(int maxAddressLength)
+  {
+    this.maxAddressLength = maxAddressLength;
+  }
+
+  public int getMaxNameLength()
+  {
+    return maxNameLength;
+  }
+
+  public void setMaxNameLength(int maxNameLength)
+  {
+    this.maxNameLength = maxNameLength;
+  }
+
+  public int getMaxCustomerId()
+  {
+    return maxCustomerId;
+  }
+
+  public void setMaxCustomerId(int maxCustomerId)
+  {
+    this.maxCustomerId = maxCustomerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/test/java/com/example/transform/ApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/examples/transform/src/test/java/com/example/transform/ApplicationTest.java 
b/examples/transform/src/test/java/com/example/transform/ApplicationTest.java
deleted file mode 100644
index 2d331d2..0000000
--- 
a/examples/transform/src/test/java/com/example/transform/ApplicationTest.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.example.transform;
-
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.LocalMode;
-
-public class ApplicationTest
-{
-  @Test
-  public void testApplication() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
-    lma.prepareDAG(new Application(), conf);
-    LocalMode.Controller lc = lma.getController();
-    lc.runAsync();
-    Thread.sleep(10 * 1000);
-    lc.shutdown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/test/java/org/apache/apex/examples/transform/ApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/examples/transform/src/test/java/org/apache/apex/examples/transform/ApplicationTest.java
 
b/examples/transform/src/test/java/org/apache/apex/examples/transform/ApplicationTest.java
new file mode 100644
index 0000000..aab8af8
--- /dev/null
+++ 
b/examples/transform/src/test/java/org/apache/apex/examples/transform/ApplicationTest.java
@@ -0,0 +1,21 @@
+package org.apache.apex.examples.transform;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+
+public class ApplicationTest
+{
+  @Test
+  public void testApplication() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+    lma.prepareDAG(new Application(), conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+    Thread.sleep(10 * 1000);
+    lc.shutdown();
+  }
+}

Reply via email to