http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java ---------------------------------------------------------------------- diff --git a/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java b/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java deleted file mode 100644 index a769016..0000000 --- a/examples/transform/src/main/java/com/example/transform/DynamicTransformApplication.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.example.transform; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StatsListener; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.partitioner.StatelessPartitioner; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner; -import com.datatorrent.lib.transform.TransformOperator; - -@ApplicationAnnotation(name="DynamicTransformApp") -public class DynamicTransformApplication implements StreamingApplication -{ - private static String COOL_DOWN_MILLIS = "dt.cooldown"; - private static String MAX_THROUGHPUT = "dt.maxThroughput"; - private static String MIN_THROUGHPUT = "dt.minThroughput"; - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - POJOGenerator input = dag.addOperator("Input", new POJOGenerator()); - TransformOperator transform = dag.addOperator("Process", new TransformOperator()); - // Set expression map - Map<String, String> expMap = new HashMap<>(); - expMap.put("name", "{$.firstName}.concat(\" \").concat({$.lastName})"); - expMap.put("age", "(new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()"); - expMap.put("address", "{$.address}.toLowerCase()"); - transform.setExpressionMap(expMap); - ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); - - dag.addStream("InputToTransform", input.output, transform.input); - dag.addStream("TransformToOutput", transform.output, output.input); - - dag.setInputPortAttribute(transform.input, Context.PortContext.TUPLE_CLASS, CustomerEvent.class); - dag.setOutputPortAttribute(transform.output, Context.PortContext.TUPLE_CLASS, CustomerInfo.class); - - StatelessThroughputBasedPartitioner<TransformOperator> partitioner = new StatelessThroughputBasedPartitioner<>(); - partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000)); - partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000)); - partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000)); - dag.setAttribute(transform, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner})); - dag.setAttribute(transform, Context.OperatorContext.PARTITIONER, partitioner); - } -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/com/example/transform/POJOGenerator.java ---------------------------------------------------------------------- diff --git a/examples/transform/src/main/java/com/example/transform/POJOGenerator.java b/examples/transform/src/main/java/com/example/transform/POJOGenerator.java deleted file mode 100644 index 9db5fd1..0000000 --- a/examples/transform/src/main/java/com/example/transform/POJOGenerator.java +++ /dev/null @@ -1,125 +0,0 @@ -package com.example.transform; - -import java.util.Date; -import java.util.Random; - -import javax.validation.constraints.Min; - -import org.apache.commons.lang3.RandomStringUtils; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; - -/** - * Generates and emits the CustomerEvent - */ -public class POJOGenerator implements InputOperator -{ - @Min(1) - private int maxCustomerId = 100000; - @Min(1) - private int maxNameLength = 10; - @Min(1) - private int maxAddressLength = 15; - private long tuplesCounter; - // Limit number of emitted tuples per window - @Min(1) - private long maxTuplesPerWindow = 100; - private final Random random = new Random(); - private final RandomStringUtils rRandom = new RandomStringUtils(); - public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>(); - - @Override - public void beginWindow(long windowId) - { - tuplesCounter = 0; - } - - @Override - public void endWindow() - { - } - - @Override - public void setup(Context.OperatorContext context) - { - } - - @Override - public void teardown() - { - - } - - CustomerEvent generateCustomersEvent() throws Exception { - - CustomerEvent customerEvent = new CustomerEvent(); - customerEvent.setCustomerId(randomId(maxCustomerId)); - customerEvent.setFirstName(rRandom.randomAlphabetic(randomId(maxNameLength))); - customerEvent.setLastName(rRandom.randomAlphabetic(randomId(maxNameLength))); - long val = random.nextLong(); - long diff1 = val % System.currentTimeMillis(); - customerEvent.setDateOfBirth(new Date(diff1)); - customerEvent.setAddress(rRandom.randomAlphabetic(randomId(maxAddressLength))); - return customerEvent; - } - - private int randomId(int max) { - if (max < 1) return 1; - return 1 + random.nextInt(max); - } - - @Override - public void emitTuples() - { - while (tuplesCounter++ < maxTuplesPerWindow) { - try { - CustomerEvent event = generateCustomersEvent(); - this.output.emit(event); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - } - - public long getMaxTuplesPerWindow() - { - return maxTuplesPerWindow; - } - - public void setMaxTuplesPerWindow(long maxTuplesPerWindow) - { - this.maxTuplesPerWindow = maxTuplesPerWindow; - } - - public int getMaxAddressLength() - { - return maxAddressLength; - } - - public void setMaxAddressLength(int maxAddressLength) - { - this.maxAddressLength = maxAddressLength; - } - - public int getMaxNameLength() - { - return maxNameLength; - } - - public void setMaxNameLength(int maxNameLength) - { - this.maxNameLength = maxNameLength; - } - - public int getMaxCustomerId() - { - return maxCustomerId; - } - - public void setMaxCustomerId(int maxCustomerId) - { - this.maxCustomerId = maxCustomerId; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/org/apache/apex/examples/transform/Application.java ---------------------------------------------------------------------- diff --git a/examples/transform/src/main/java/org/apache/apex/examples/transform/Application.java b/examples/transform/src/main/java/org/apache/apex/examples/transform/Application.java new file mode 100644 index 0000000..e6a0419 --- /dev/null +++ b/examples/transform/src/main/java/org/apache/apex/examples/transform/Application.java @@ -0,0 +1,39 @@ +package org.apache.apex.examples.transform; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.transform.TransformOperator; + +@ApplicationAnnotation(name="TransformExample") +public class Application implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + POJOGenerator input = dag.addOperator("Input", new POJOGenerator()); + TransformOperator transform = dag.addOperator("Process", new TransformOperator()); + // Set expression map + Map<String, String> expMap = new HashMap<>(); + expMap.put("name", "{$.firstName}.concat(\" \").concat({$.lastName})"); + expMap.put("age", "(new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()"); + expMap.put("address", "{$.address}.toLowerCase()"); + transform.setExpressionMap(expMap); + ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); + + dag.addStream("InputToTransform", input.output, transform.input); + dag.addStream("TransformToOutput", transform.output, output.input); + + dag.setInputPortAttribute(transform.input, Context.PortContext.TUPLE_CLASS, CustomerEvent.class); + dag.setOutputPortAttribute(transform.output, Context.PortContext.TUPLE_CLASS, CustomerInfo.class); + dag.setAttribute(transform, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TransformOperator>(2)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerEvent.java ---------------------------------------------------------------------- diff --git a/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerEvent.java b/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerEvent.java new file mode 100644 index 0000000..8011e12 --- /dev/null +++ b/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerEvent.java @@ -0,0 +1,74 @@ +package org.apache.apex.examples.transform; + +import java.util.Date; + +public class CustomerEvent +{ + private int customerId; + private String firstName; + private String lastName; + private Date dateOfBirth; + private String address; + + public int getCustomerId() + { + return customerId; + } + + public void setCustomerId(int customerId) + { + this.customerId = customerId; + } + + public String getFirstName() + { + return firstName; + } + + public void setFirstName(String firstName) + { + this.firstName = firstName; + } + + public String getLastName() + { + return lastName; + } + + public void setLastName(String lastName) + { + this.lastName = lastName; + } + + public Date getDateOfBirth() + { + return dateOfBirth; + } + + public void setDateOfBirth(Date dateOfBirth) + { + this.dateOfBirth = dateOfBirth; + } + + public String getAddress() + { + return address; + } + + public void setAddress(String address) + { + this.address = address; + } + + @Override + public String toString() + { + return "CustomerEvent{" + + "customerId=" + customerId + + ", firstName='" + firstName + '\'' + + ", lastName='" + lastName + '\'' + + ", dateOfBirth=" + dateOfBirth + + ", address='" + address + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerInfo.java ---------------------------------------------------------------------- diff --git a/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerInfo.java b/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerInfo.java new file mode 100644 index 0000000..ece76a6 --- /dev/null +++ b/examples/transform/src/main/java/org/apache/apex/examples/transform/CustomerInfo.java @@ -0,0 +1,60 @@ +package org.apache.apex.examples.transform; + +public class CustomerInfo +{ + private int customerId; + private String name; + private int age; + private String address; + + public int getCustomerId() + { + return customerId; + } + + public void setCustomerId(int customerId) + { + this.customerId = customerId; + } + + public String getName() + { + return name; + } + + public void setName(String name) + { + this.name = name; + } + + public int getAge() + { + return age; + } + + public void setAge(int age) + { + this.age = age; + } + + public String getAddress() + { + return address; + } + + public void setAddress(String address) + { + this.address = address; + } + + @Override + public String toString() + { + return "CustomerInfo{" + + "customerId=" + customerId + + ", name='" + name + '\'' + + ", age=" + age + + ", address='" + address + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/org/apache/apex/examples/transform/DynamicTransformApplication.java ---------------------------------------------------------------------- diff --git a/examples/transform/src/main/java/org/apache/apex/examples/transform/DynamicTransformApplication.java b/examples/transform/src/main/java/org/apache/apex/examples/transform/DynamicTransformApplication.java new file mode 100644 index 0000000..01bc446 --- /dev/null +++ b/examples/transform/src/main/java/org/apache/apex/examples/transform/DynamicTransformApplication.java @@ -0,0 +1,51 @@ +package org.apache.apex.examples.transform; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StatsListener; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner; +import com.datatorrent.lib.transform.TransformOperator; + +@ApplicationAnnotation(name="DynamicTransformApp") +public class DynamicTransformApplication implements StreamingApplication +{ + private static String COOL_DOWN_MILLIS = "dt.cooldown"; + private static String MAX_THROUGHPUT = "dt.maxThroughput"; + private static String MIN_THROUGHPUT = "dt.minThroughput"; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + POJOGenerator input = dag.addOperator("Input", new POJOGenerator()); + TransformOperator transform = dag.addOperator("Process", new TransformOperator()); + // Set expression map + Map<String, String> expMap = new HashMap<>(); + expMap.put("name", "{$.firstName}.concat(\" \").concat({$.lastName})"); + expMap.put("age", "(new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()"); + expMap.put("address", "{$.address}.toLowerCase()"); + transform.setExpressionMap(expMap); + ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); + + dag.addStream("InputToTransform", input.output, transform.input); + dag.addStream("TransformToOutput", transform.output, output.input); + + dag.setInputPortAttribute(transform.input, Context.PortContext.TUPLE_CLASS, CustomerEvent.class); + dag.setOutputPortAttribute(transform.output, Context.PortContext.TUPLE_CLASS, CustomerInfo.class); + + StatelessThroughputBasedPartitioner<TransformOperator> partitioner = new StatelessThroughputBasedPartitioner<>(); + partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000)); + partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000)); + partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000)); + dag.setAttribute(transform, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner})); + dag.setAttribute(transform, Context.OperatorContext.PARTITIONER, partitioner); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/org/apache/apex/examples/transform/POJOGenerator.java ---------------------------------------------------------------------- diff --git a/examples/transform/src/main/java/org/apache/apex/examples/transform/POJOGenerator.java b/examples/transform/src/main/java/org/apache/apex/examples/transform/POJOGenerator.java new file mode 100644 index 0000000..f8f3b22 --- /dev/null +++ b/examples/transform/src/main/java/org/apache/apex/examples/transform/POJOGenerator.java @@ -0,0 +1,125 @@ +package org.apache.apex.examples.transform; + +import java.util.Date; +import java.util.Random; + +import javax.validation.constraints.Min; + +import org.apache.commons.lang3.RandomStringUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; + +/** + * Generates and emits the CustomerEvent + */ +public class POJOGenerator implements InputOperator +{ + @Min(1) + private int maxCustomerId = 100000; + @Min(1) + private int maxNameLength = 10; + @Min(1) + private int maxAddressLength = 15; + private long tuplesCounter; + // Limit number of emitted tuples per window + @Min(1) + private long maxTuplesPerWindow = 100; + private final Random random = new Random(); + private final RandomStringUtils rRandom = new RandomStringUtils(); + public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>(); + + @Override + public void beginWindow(long windowId) + { + tuplesCounter = 0; + } + + @Override + public void endWindow() + { + } + + @Override + public void setup(Context.OperatorContext context) + { + } + + @Override + public void teardown() + { + + } + + CustomerEvent generateCustomersEvent() throws Exception { + + CustomerEvent customerEvent = new CustomerEvent(); + customerEvent.setCustomerId(randomId(maxCustomerId)); + customerEvent.setFirstName(rRandom.randomAlphabetic(randomId(maxNameLength))); + customerEvent.setLastName(rRandom.randomAlphabetic(randomId(maxNameLength))); + long val = random.nextLong(); + long diff1 = val % System.currentTimeMillis(); + customerEvent.setDateOfBirth(new Date(diff1)); + customerEvent.setAddress(rRandom.randomAlphabetic(randomId(maxAddressLength))); + return customerEvent; + } + + private int randomId(int max) { + if (max < 1) return 1; + return 1 + random.nextInt(max); + } + + @Override + public void emitTuples() + { + while (tuplesCounter++ < maxTuplesPerWindow) { + try { + CustomerEvent event = generateCustomersEvent(); + this.output.emit(event); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } + + public long getMaxTuplesPerWindow() + { + return maxTuplesPerWindow; + } + + public void setMaxTuplesPerWindow(long maxTuplesPerWindow) + { + this.maxTuplesPerWindow = maxTuplesPerWindow; + } + + public int getMaxAddressLength() + { + return maxAddressLength; + } + + public void setMaxAddressLength(int maxAddressLength) + { + this.maxAddressLength = maxAddressLength; + } + + public int getMaxNameLength() + { + return maxNameLength; + } + + public void setMaxNameLength(int maxNameLength) + { + this.maxNameLength = maxNameLength; + } + + public int getMaxCustomerId() + { + return maxCustomerId; + } + + public void setMaxCustomerId(int maxCustomerId) + { + this.maxCustomerId = maxCustomerId; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/test/java/com/example/transform/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/transform/src/test/java/com/example/transform/ApplicationTest.java b/examples/transform/src/test/java/com/example/transform/ApplicationTest.java deleted file mode 100644 index 2d331d2..0000000 --- a/examples/transform/src/test/java/com/example/transform/ApplicationTest.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.example.transform; - -import org.junit.Test; -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.LocalMode; - -public class ApplicationTest -{ - @Test - public void testApplication() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); - Thread.sleep(10 * 1000); - lc.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/test/java/org/apache/apex/examples/transform/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/transform/src/test/java/org/apache/apex/examples/transform/ApplicationTest.java b/examples/transform/src/test/java/org/apache/apex/examples/transform/ApplicationTest.java new file mode 100644 index 0000000..aab8af8 --- /dev/null +++ b/examples/transform/src/test/java/org/apache/apex/examples/transform/ApplicationTest.java @@ -0,0 +1,21 @@ +package org.apache.apex.examples.transform; + +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; + +public class ApplicationTest +{ + @Test + public void testApplication() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + Thread.sleep(10 * 1000); + lc.shutdown(); + } +}
