http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/JDBCLoaderTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/JDBCLoaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/JDBCLoaderTest.java new file mode 100644 index 0000000..f450736 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/JDBCLoaderTest.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.netlet.util.DTThrowable; + +public class JDBCLoaderTest +{ + static final Logger logger = LoggerFactory.getLogger(JDBCLoaderTest.class); + + public static class TestMeta extends TestWatcher + { + JDBCLoader dbloader; + int[] id = {1, 2, 3, 4}; + String[] name = {"Paul", "Allen", "Teddy", "Mark"}; + int[] age = {32, 25, 23, 25}; + String[] address = {"California", "Texas", "Norway", "Rich-Mond"}; + double[] salary = {20000.00, 15000.00, 20000.00, 65000.00}; + + @Override + protected void starting(Description description) + { + try { + dbloader = new JDBCLoader(); + dbloader.setDatabaseDriver("org.hsqldb.jdbcDriver"); + dbloader.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true"); + dbloader.setTableName("COMPANY"); + + dbloader.connect(); + createTable(); + insertRecordsInTable(); + } catch (Throwable e) { + DTThrowable.rethrow(e); + } + } + + private void createTable() + { + try { + Statement stmt = dbloader.getConnection().createStatement(); + + String createTable = "CREATE TABLE " + dbloader.getTableName() + " " + + "(ID INT PRIMARY KEY, " + + "NAME CHAR(50), " + + "AGE INT, " + + "ADDRESS CHAR(50), " + + "SALARY REAL)"; + logger.debug(createTable); + stmt.executeUpdate(createTable); + + logger.debug("Table created successfully..."); + } catch (Throwable e) { + DTThrowable.rethrow(e); + } + } + + private void insertRecordsInTable() + { + try { + Statement stmt = dbloader.getConnection().createStatement(); + String tbName = dbloader.getTableName(); + + for (int i = 0; i < id.length; i++) { + String sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " + + "VALUES (" + id[i] + ", '" + name[i] + "', " + age[i] + ", '" + address[i] + "', " + salary[i] + " );"; + stmt.executeUpdate(sql); + } + } catch (Throwable e) { + DTThrowable.rethrow(e); + } + + } + + private void cleanTable() + { + String sql = "DROP TABLE " + dbloader.tableName; + try { + Statement stmt = dbloader.getConnection().createStatement(); + stmt.executeUpdate(sql); + logger.debug("Table deleted successfully..."); + } catch (SQLException e) { + DTThrowable.rethrow(e); + } + } + + @Override + protected void finished(Description description) + { + cleanTable(); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testMysqlDBLookup() throws Exception + { + CountDownLatch latch = new CountDownLatch(1); + + ArrayList<FieldInfo> lookupKeys = new ArrayList<>(); + lookupKeys.add(new FieldInfo("ID", "ID", FieldInfo.SupportType.INTEGER)); + ArrayList<FieldInfo> includeKeys = new ArrayList<>(); + includeKeys.add(new FieldInfo("NAME", "NAME", FieldInfo.SupportType.STRING)); + includeKeys.add(new FieldInfo("AGE", "AGE", FieldInfo.SupportType.INTEGER)); + includeKeys.add(new FieldInfo("ADDRESS", "ADDRESS", FieldInfo.SupportType.STRING)); + + testMeta.dbloader.setFieldInfo(lookupKeys, includeKeys); + + latch.await(1000, TimeUnit.MILLISECONDS); + + ArrayList<Object> keys = new ArrayList<>(); + keys.add(4); + + ArrayList<Object> columnInfo = (ArrayList<Object>)testMeta.dbloader.get(keys); + + Assert.assertEquals("NAME", "Mark", columnInfo.get(0).toString().trim()); + Assert.assertEquals("AGE", 25, columnInfo.get(1)); + Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(2).toString().trim()); + } + + @Test + public void testMysqlDBLookupIncludeAllKeys() throws Exception + { + CountDownLatch latch = new CountDownLatch(1); + + ArrayList<FieldInfo> lookupKeys = new ArrayList<>(); + lookupKeys.add(new FieldInfo("ID", "ID", FieldInfo.SupportType.INTEGER)); + + ArrayList<FieldInfo> includeKeys = new ArrayList<>(); + includeKeys.add(new FieldInfo("ID", "ID", FieldInfo.SupportType.INTEGER)); + includeKeys.add(new FieldInfo("NAME", "NAME", FieldInfo.SupportType.STRING)); + includeKeys.add(new FieldInfo("AGE", "AGE", FieldInfo.SupportType.INTEGER)); + includeKeys.add(new FieldInfo("ADDRESS", "ADDRESS", FieldInfo.SupportType.STRING)); + includeKeys.add(new FieldInfo("SALARY", "SALARY", FieldInfo.SupportType.DOUBLE)); + + testMeta.dbloader.setFieldInfo(lookupKeys, includeKeys); + + latch.await(1000, TimeUnit.MILLISECONDS); + + ArrayList<Object> keys = new ArrayList<Object>(); + keys.add(4); + + ArrayList<Object> columnInfo = (ArrayList<Object>)testMeta.dbloader.get(keys); + + Assert.assertEquals("ID", 4, columnInfo.get(0)); + Assert.assertEquals("NAME", "Mark", columnInfo.get(1).toString().trim()); + Assert.assertEquals("AGE", 25, columnInfo.get(2)); + Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(3).toString().trim()); + Assert.assertEquals("SALARY", 65000.0, columnInfo.get(4)); + } + + @Test + public void testMysqlDBQuery() throws Exception + { + CountDownLatch latch = new CountDownLatch(1); + + testMeta.dbloader + .setQueryStmt("Select id, name from " + testMeta.dbloader.getTableName() + " where AGE = ? and ADDRESS = ?"); + + latch.await(1000, TimeUnit.MILLISECONDS); + + ArrayList<FieldInfo> includeKeys = new ArrayList<>(); + includeKeys.add(new FieldInfo("ID", "ID", FieldInfo.SupportType.INTEGER)); + includeKeys.add(new FieldInfo("NAME", "NAME", FieldInfo.SupportType.STRING)); + + testMeta.dbloader.setFieldInfo(null, includeKeys); + + ArrayList<Object> keys = new ArrayList<Object>(); + keys.add(25); + keys.add("Texas"); + + ArrayList<Object> columnInfo = (ArrayList<Object>)testMeta.dbloader.get(keys); + + Assert.assertEquals("ID", 2, columnInfo.get(0)); + Assert.assertEquals("NAME", "Allen", columnInfo.get(1).toString().trim()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java new file mode 100644 index 0000000..7323f0d --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/MapEnricherTest.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.FieldInfo; +import com.datatorrent.lib.util.TestUtils; + +public class MapEnricherTest +{ + @Test + public void includeAllKeys() + { + MapEnricher oper = new MapEnricher(); + oper.setStore(new MemoryStore()); + oper.setLookupFields(Arrays.asList("In1")); + oper.setup(null); + + CollectorTestSink sink = new CollectorTestSink(); + TestUtils.setSink(oper.output, sink); + + Map<String, Object> inMap = Maps.newHashMap(); + inMap.put("In1", "Value1"); + inMap.put("In2", "Value2"); + + oper.activate(null); + oper.beginWindow(1); + oper.input.process(inMap); + oper.endWindow(); + oper.deactivate(); + + Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size()); + Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, C=Val_C, In2=Value2, In1=Value3}", + sink.collectedTuples.get(0).toString()); + } + + @Test + public void includeSelectedKeys() + { + MapEnricher oper = new MapEnricher(); + oper.setStore(new MemoryStore()); + oper.setLookupFields(Arrays.asList("In1")); + oper.setIncludeFields(Arrays.asList("A", "B")); + oper.setup(null); + + CollectorTestSink sink = new CollectorTestSink(); + TestUtils.setSink(oper.output, sink); + + Map<String, Object> inMap = Maps.newHashMap(); + inMap.put("In1", "Value1"); + inMap.put("In2", "Value2"); + + oper.activate(null); + oper.beginWindow(1); + oper.input.process(inMap); + oper.endWindow(); + oper.deactivate(); + + Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size()); + Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, In2=Value2, In1=Value1}", + sink.collectedTuples.get(0).toString()); + } + + @Test + public void testApplication() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(new EnrichApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000);// runs for 10 seconds and quits + } + + public static class EnrichApplication implements StreamingApplication + { + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + RandomMapGenerator input = dag.addOperator("Input", RandomMapGenerator.class); + MapEnricher enrich = dag.addOperator("Enrich", MapEnricher.class); + ConsoleOutputOperator console = dag.addOperator("Console", ConsoleOutputOperator.class); + console.setSilent(true); + + List<String> includeFields = new ArrayList<>(); + includeFields.add("A"); + includeFields.add("B"); + List<String> lookupFields = new ArrayList<>(); + lookupFields.add("In1"); + + enrich.setStore(new MemoryStore()); + enrich.setIncludeFields(includeFields); + enrich.setLookupFields(lookupFields); + + dag.addStream("S1", input.output, enrich.input); + dag.addStream("S2", enrich.output, console.input); + } + } + + public static class RandomMapGenerator extends BaseOperator implements InputOperator + { + private int key = 0; + + public final transient DefaultOutputPort output = new DefaultOutputPort(); + + @Override + public void emitTuples() + { + Map<String, String> map = new HashMap<>(); + map.put("In" + (key + 1), "Value" + (key + 1)); + map.put("In2", "Value3"); + output.emit(map); + } + } + + private static class MemoryStore implements BackendLoader + { + static Map<String, Map> returnData = Maps.newHashMap(); + private List<FieldInfo> includeFieldInfo; + + static { + Map<String, String> map = Maps.newHashMap(); + map.put("A", "Val_A"); + map.put("B", "Val_B"); + map.put("C", "Val_C"); + map.put("In1", "Value3"); + returnData.put("Value1", map); + + map = Maps.newHashMap(); + map.put("A", "Val_A_1"); + map.put("B", "Val_B_1"); + map.put("C", "Val_C"); + map.put("In1", "Value3"); + returnData.put("Value2", map); + } + + @Override + public Map<Object, Object> loadInitialData() + { + return null; + } + + @Override + public Object get(Object key) + { + List<String> keyList = (List<String>)key; + Map<String, String> keyValue = returnData.get(keyList.get(0)); + ArrayList<Object> lst = new ArrayList<Object>(); + + if (CollectionUtils.isEmpty(includeFieldInfo)) { + if (includeFieldInfo == null) { + includeFieldInfo = new ArrayList<>(); + } + for (Map.Entry<String, String> entry : keyValue.entrySet()) { + // TODO: Identify the types.. + includeFieldInfo.add(new FieldInfo(entry.getKey(), entry.getKey(), FieldInfo.SupportType.OBJECT)); + } + } + + for (FieldInfo fieldInfo : includeFieldInfo) { + lst.add(keyValue.get(fieldInfo.getColumnName())); + } + + return lst; + } + + @Override + public List<Object> getAll(List<Object> keys) + { + return null; + } + + @Override + public void put(Object key, Object value) + { + + } + + @Override + public void putAll(Map<Object, Object> m) + { + + } + + @Override + public void remove(Object key) + { + + } + + @Override + public void connect() throws IOException + { + + } + + @Override + public void disconnect() throws IOException + { + + } + + @Override + public boolean isConnected() + { + return false; + } + + @Override + public void setFieldInfo(List<FieldInfo> lookupFieldInfo, List<FieldInfo> includeFieldInfo) + { + this.includeFieldInfo = includeFieldInfo; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/Order.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/Order.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/Order.java new file mode 100644 index 0000000..52aa698 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/Order.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +/** + * Input Class for POJO Enricher test. + */ +public class Order +{ + public int OID; + public int ID; + public double amount; + + public Order() + { + // for kryo + } + + public Order(int oid, int id, double amount) + { + this.OID = oid; + this.ID = id; + this.amount = amount; + } + + public int getOID() + { + return OID; + } + + public void setOID(int OID) + { + this.OID = OID; + } + + public int getID() + { + return ID; + } + + public void setID(int ID) + { + this.ID = ID; + } + + public double getAmount() + { + return amount; + } + + public void setAmount(double amount) + { + this.amount = amount; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java new file mode 100644 index 0000000..7a6bc27 --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/POJOEnricherTest.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.util.ArrayList; +import java.util.Arrays; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class POJOEnricherTest extends JDBCLoaderTest +{ + @Test + public void includeSelectedKeys() + { + POJOEnricher oper = new POJOEnricher(); + oper.setStore(testMeta.dbloader); + oper.setLookupFields(Arrays.asList("ID")); + oper.setIncludeFields(Arrays.asList("NAME", "AGE", "ADDRESS")); + oper.outputClass = EmployeeOrder.class; + oper.inputClass = Order.class; + oper.setup(null); + + CollectorTestSink sink = new CollectorTestSink(); + TestUtils.setSink(oper.output, sink); + + oper.activate(null); + + oper.beginWindow(1); + Order tuple = new Order(3, 4, 700); + oper.input.process(tuple); + oper.endWindow(); + + oper.deactivate(); + + Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size()); + Assert.assertEquals("Ouput Tuple: ", + "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=0.0}", + sink.collectedTuples.get(0).toString()); + } + + @Test + public void includeAllKeys() + { + POJOEnricher oper = new POJOEnricher(); + oper.setStore(testMeta.dbloader); + oper.setLookupFields(Arrays.asList("ID")); + oper.setIncludeFields(Arrays.asList("NAME", "AGE", "ADDRESS", "SALARY")); + oper.outputClass = EmployeeOrder.class; + oper.inputClass = Order.class; + oper.setup(null); + + CollectorTestSink sink = new CollectorTestSink(); + TestUtils.setSink(oper.output, sink); + + oper.activate(null); + + oper.beginWindow(1); + Order tuple = new Order(3, 4, 700); + oper.input.process(tuple); + oper.endWindow(); + + oper.deactivate(); + + Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size()); + Assert.assertEquals("Ouput Tuple: ", + "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=65000.0}", + sink.collectedTuples.get(0).toString()); + } + + @Test + public void testApplication() throws Exception + { + EnrichApplication enrichApplication = new EnrichApplication(testMeta); + enrichApplication.setLoader(testMeta.dbloader); + + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + lma.prepareDAG(enrichApplication, conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000);// runs for 10 seconds and quits + } + + public static class EnrichApplication implements StreamingApplication + { + private final TestMeta testMeta; + BackendLoader loader; + + public EnrichApplication(TestMeta testMeta) + { + this.testMeta = testMeta; + } + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + RandomPOJOGenerator input = dag.addOperator("Input", RandomPOJOGenerator.class); + POJOEnricher enrich = dag.addOperator("Enrich", POJOEnricher.class); + EnrichVerifier verify = dag.addOperator("Verify", EnrichVerifier.class); + verify.address = testMeta.address; + verify.age = testMeta.age; + verify.names = testMeta.name; + verify.salary = testMeta.salary; + + enrich.setStore(loader); + ArrayList<String> lookupFields = new ArrayList<>(); + lookupFields.add("ID"); + ArrayList<String> includeFields = new ArrayList<>(); + includeFields.add("NAME"); + includeFields.add("AGE"); + includeFields.add("ADDRESS"); + includeFields.add("SALARY"); + enrich.setLookupFields(lookupFields); + enrich.setIncludeFields(includeFields); + + dag.getMeta(enrich).getMeta(enrich.input).getAttributes().put(Context.PortContext.TUPLE_CLASS, Order.class); + dag.getMeta(enrich).getMeta(enrich.output).getAttributes() + .put(Context.PortContext.TUPLE_CLASS, EmployeeOrder.class); + + dag.addStream("S1", input.output, enrich.input); + dag.addStream("S2", enrich.output, verify.input); + } + + public void setLoader(BackendLoader loader) + { + this.loader = loader; + } + } + + public static class RandomPOJOGenerator implements InputOperator + { + public transient DefaultOutputPort<Object> output = new DefaultOutputPort<>(); + private int idx = 0; + private boolean emit = true; + + @Override + public void emitTuples() + { + if (!emit) { + return; + } + idx += idx++ % 4; + Order o = new Order(1, idx + 1, 100.00); + output.emit(o); + if (idx == 3) { + emit = false; + } + } + + @Override + public void beginWindow(long l) + { + emit = true; + } + + @Override + public void endWindow() + { + + } + + @Override + public void setup(Context.OperatorContext context) + { + } + + @Override + public void teardown() + { + } + } + + public static class EnrichVerifier extends BaseOperator + { + private static final Logger logger = LoggerFactory.getLogger(EnrichVerifier.class); + String[] names; + int[] age; + String[] address; + double[] salary; + + private transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() + { + @Override + public void process(Object o) + { + Assert.assertTrue(o instanceof EmployeeOrder); + EmployeeOrder order = (EmployeeOrder)o; + int id = order.getID(); + Assert.assertTrue(id >= 1 && id <= 4); + Assert.assertEquals(1, order.getOID()); + Assert.assertEquals(100.00, order.getAmount(), 0); + + Assert.assertEquals(names[id - 1], order.getNAME()); + Assert.assertEquals(age[id - 1], order.getAGE()); + Assert.assertEquals(address[id - 1], order.getADDRESS()); + Assert.assertEquals(salary[id - 1], order.getSALARY(), 0); + } + }; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java deleted file mode 100644 index 308aa82..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java +++ /dev/null @@ -1,95 +0,0 @@ -package com.datatorrent.contrib.enrichment; - -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; -import org.junit.Assert; -import org.junit.Test; - -public class BeanEnrichmentOperatorTest extends JDBCLoaderTest -{ - public class Order { - public int OID; - public int ID; - public double amount; - - public Order(int oid, int id, double amount) { - this.OID = oid; - this.ID = id; - this.amount = amount; - } - public int getOID() - { - return OID; - } - - public void setOID(int OID) - { - this.OID = OID; - } - - public int getID() - { - return ID; - } - - public void setID(int ID) - { - this.ID = ID; - } - - public double getAmount() - { - return amount; - } - - public void setAmount(double amount) - { - this.amount = amount; - } - } - - - @Test - public void includeSelectedKeys() - { - POJOEnrichmentOperator oper = new POJOEnrichmentOperator(); - oper.setStore(testMeta.dbloader); - oper.setLookupFieldsStr("ID"); - oper.setIncludeFieldsStr("NAME,AGE,ADDRESS"); - oper.outputClass = EmployeeOrder.class; - oper.setup(null); - - CollectorTestSink sink = new CollectorTestSink(); - TestUtils.setSink(oper.output, sink); - - oper.beginWindow(1); - Order tuple = new Order(3, 4, 700); - oper.input.process(tuple); - oper.endWindow(); - - Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size()); - Assert.assertEquals("Ouput Tuple: ", "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=0.0}", sink.collectedTuples.get(0).toString()); - } - @Test - public void includeAllKeys() - { - POJOEnrichmentOperator oper = new POJOEnrichmentOperator(); - oper.setStore(testMeta.dbloader); - oper.setLookupFieldsStr("ID"); - oper.outputClass = EmployeeOrder.class; - oper.setup(null); - - CollectorTestSink sink = new CollectorTestSink(); - TestUtils.setSink(oper.output, sink); - - - oper.beginWindow(1); - Order tuple = new Order(3, 4, 700); - oper.input.process(tuple); - oper.endWindow(); - - Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size()); - Assert.assertEquals("Ouput Tuple: ", "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=65000.0}", sink.collectedTuples.get(0).toString()); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java deleted file mode 100644 index 00c9d82..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java +++ /dev/null @@ -1,95 +0,0 @@ -package com.datatorrent.contrib.enrichment; - -// This class is needed for Bean Enrichment Operator testing -public class EmployeeOrder { - public int OID; - public int ID; - public double amount; - public String NAME; - public int AGE; - public String ADDRESS; - public double SALARY; - - public int getOID() - { - return OID; - } - - public void setOID(int OID) - { - this.OID = OID; - } - - public int getID() - { - return ID; - } - - public void setID(int ID) - { - this.ID = ID; - } - - public int getAGE() - { - return AGE; - } - - public void setAGE(int AGE) - { - this.AGE = AGE; - } - - public String getNAME() - { - return NAME; - } - - public void setNAME(String NAME) - { - this.NAME = NAME; - } - - public double getAmount() - { - return amount; - } - - public void setAmount(double amount) - { - this.amount = amount; - } - - public String getADDRESS() - { - return ADDRESS; - } - - public void setADDRESS(String ADDRESS) - { - this.ADDRESS = ADDRESS; - } - - public double getSALARY() - { - return SALARY; - } - - public void setSALARY(double SALARY) - { - this.SALARY = SALARY; - } - - @Override public String toString() - { - return "{" + - "OID=" + OID + - ", ID=" + ID + - ", amount=" + amount + - ", NAME='" + NAME + '\'' + - ", AGE=" + AGE + - ", ADDRESS='" + ADDRESS.trim() + '\'' + - ", SALARY=" + SALARY + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java deleted file mode 100644 index 934d73b..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.datatorrent.contrib.enrichment; - -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; -import com.esotericsoftware.kryo.Kryo; -import com.google.common.collect.Maps; -import org.apache.commons.io.FileUtils; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.util.Map; - - -public class FileEnrichmentTest -{ - - @Rule public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo(); - - @Test public void testEnrichmentOperator() throws IOException, InterruptedException - { - URL origUrl = this.getClass().getResource("/productmapping.txt"); - - URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping1.txt"); - FileUtils.deleteQuietly(new File(fileUrl.getPath())); - FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath())); - - MapEnrichmentOperator oper = new MapEnrichmentOperator(); - FSLoader store = new FSLoader(); - store.setFileName(fileUrl.toString()); - oper.setLookupFieldsStr("productId"); - oper.setStore(store); - - oper.setup(null); - - /* File contains 6 entries, but operator one entry is duplicate, - * so cache should contains only 5 entries after scanning input file. - */ - //Assert.assertEquals("Number of mappings ", 7, oper.cache.size()); - - CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<Map<String, Object>>(); - @SuppressWarnings({ "unchecked", "rawtypes" }) CollectorTestSink<Object> tmp = (CollectorTestSink) sink; - oper.output.setSink(tmp); - - oper.beginWindow(0); - Map<String, Object> tuple = Maps.newHashMap(); - tuple.put("productId", 3); - tuple.put("channelId", 4); - tuple.put("amount", 10.0); - - Kryo kryo = new Kryo(); - oper.input.process(kryo.copy(tuple)); - - oper.endWindow(); - - /* Number of tuple, emitted */ - Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size()); - Map<String, Object> emitted = sink.collectedTuples.iterator().next(); - - /* The fields present in original event is kept as it is */ - Assert.assertEquals("Number of fields in emitted tuple", 4, emitted.size()); - Assert.assertEquals("value of productId is 3", tuple.get("productId"), emitted.get("productId")); - Assert.assertEquals("value of channelId is 4", tuple.get("channelId"), emitted.get("channelId")); - Assert.assertEquals("value of amount is 10.0", tuple.get("amount"), emitted.get("amount")); - - /* Check if productCategory is added to the event */ - Assert.assertEquals("productCategory is part of tuple", true, emitted.containsKey("productCategory")); - Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory")); - - } -} - http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java deleted file mode 100644 index 07be982..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java +++ /dev/null @@ -1,162 +0,0 @@ -package com.datatorrent.contrib.enrichment; - -import com.datatorrent.netlet.util.DTThrowable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; -import org.slf4j.LoggerFactory; - -public class HBaseLoaderTest -{ - static final org.slf4j.Logger logger = LoggerFactory.getLogger(HBaseLoaderTest.class); - - public static class TestMeta extends TestWatcher - { - - HBaseLoader dbloader; - @Override - protected void starting(Description description) - { - try { - dbloader = new HBaseLoader(); - Configuration conf = HBaseConfiguration.create(); - conf.addResource(new Path("file:///home/chaitanya/hbase-site.xml")); - - dbloader.setConfiguration(conf); - dbloader.setZookeeperQuorum("localhost"); - dbloader.setZookeeperClientPort(2181); - - dbloader.setTableName("EMPLOYEE"); - - dbloader.connect(); - createTable(); - insertRecordsInTable(); - } - catch (Throwable e) { - DTThrowable.rethrow(e); - } - } - - private void createTable() - { - try { - String[] familys = { "personal", "professional" }; - HBaseAdmin admin = new HBaseAdmin(dbloader.getConfiguration()); - HTableDescriptor tableDesc = new HTableDescriptor(dbloader.getTableName()); - for (int i = 0; i < familys.length; i++) { - tableDesc.addFamily(new HColumnDescriptor(familys[i])); - } - admin.createTable(tableDesc); - - logger.debug("Table created successfully..."); - } - catch (Throwable e) { - DTThrowable.rethrow(e); - } - } - - @SuppressWarnings("deprecation") - public void addRecord(String rowKey, String family, String qualifier, String value) throws Exception { - try { - HTable table = new HTable(dbloader.getConfiguration(), dbloader.getTableName()); - Put put = new Put(Bytes.toBytes(rowKey)); - put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes - .toBytes(value)); - table.put(put); - } catch (IOException e) { - DTThrowable.rethrow(e); - } - } - private void insertRecordsInTable() - { - try { - addRecord("row1", "personal", "name", "raju"); - addRecord("row1", "personal", "city", "hyderabad"); - addRecord("row1", "professional", "designation", "manager"); - addRecord("row1", "professional", "Salary", "50000"); - - addRecord("row2", "personal", "name", "ravi"); - addRecord("row2", "personal", "city", "Chennai"); - addRecord("row2", "professional", "designation", "SE"); - addRecord("row2", "professional", "Salary", "30000"); - - addRecord("row3", "personal", "name", "rajesh"); - addRecord("row3", "personal", "city", "Delhi"); - addRecord("row3", "professional", "designation", "E"); - addRecord("row3", "professional", "Salary", "10000"); - } - catch (Throwable e) { - DTThrowable.rethrow(e); - } - - } - - private void cleanTable() - { - String sql = "delete from " + dbloader.getTableName(); - try { - HBaseAdmin admin = new HBaseAdmin(dbloader.getConfiguration()); - admin.disableTable(dbloader.getTableName()); - admin.deleteTable(dbloader.getTableName()); - } catch (MasterNotRunningException e) { - e.printStackTrace(); - } catch (ZooKeeperConnectionException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - protected void finished(Description description) - { - cleanTable(); - } - } - - @Rule - public TestMeta testMeta = new TestMeta(); - - @Test - public void testHBaseLookup() throws Exception - { - CountDownLatch latch = new CountDownLatch(1); - - ArrayList<String> includeKeys = new ArrayList<String>(); - includeKeys.add("city"); - includeKeys.add("Salary"); - ArrayList<String> lookupKeys = new ArrayList<String>(); - lookupKeys.add("ID"); - testMeta.dbloader.setFields(lookupKeys, includeKeys); - - String includeFamilyStr = "personal, professional"; - testMeta.dbloader.setIncludeFamilyStr(includeFamilyStr); - - latch.await(1000, TimeUnit.MILLISECONDS); - - ArrayList<Object> keys = new ArrayList<Object>(); - keys.add("row2"); - - ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys); - - Assert.assertEquals("CITY", "Chennai", columnInfo.get(0).toString().trim()); - Assert.assertEquals("Salary", 30000, columnInfo.get(1)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java deleted file mode 100644 index 72cfb88..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java +++ /dev/null @@ -1,179 +0,0 @@ -package com.datatorrent.contrib.enrichment; - -import com.datatorrent.netlet.util.DTThrowable; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.slf4j.LoggerFactory; - -public class JDBCLoaderTest -{ - static final org.slf4j.Logger logger = LoggerFactory.getLogger(JDBCLoaderTest.class); - - public static class TestMeta extends TestWatcher - { - JDBCLoader dbloader; - @Override - protected void starting(Description description) - { - try { - dbloader = new JDBCLoader(); - dbloader.setDatabaseDriver("org.hsqldb.jdbcDriver"); - dbloader.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true"); - dbloader.setTableName("COMPANY"); - - dbloader.connect(); - createTable(); - insertRecordsInTable(); - } - catch (Throwable e) { - DTThrowable.rethrow(e); - } - } - - private void createTable() - { - try { - Statement stmt = dbloader.getConnection().createStatement(); - - String createTable = "CREATE TABLE IF NOT EXISTS " + dbloader.getTableName() + - "(ID INT PRIMARY KEY NOT NULL," + - " NAME TEXT NOT NULL, " + - " AGE INT NOT NULL, " + - " ADDRESS CHAR(50), " + - " SALARY REAL)"; - stmt.executeUpdate(createTable); - - logger.debug("Table created successfully..."); - } - catch (Throwable e) { - DTThrowable.rethrow(e); - } - } - - private void insertRecordsInTable() - { - try { - Statement stmt = dbloader.getConnection().createStatement(); - String tbName = dbloader.getTableName(); - - String sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " + - "VALUES (1, 'Paul', 32, 'California', 20000.00 );"; - stmt.executeUpdate(sql); - - sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " + - "VALUES (2, 'Allen', 25, 'Texas', 15000.00 );"; - stmt.executeUpdate(sql); - - sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " + - "VALUES (3, 'Teddy', 23, 'Norway', 20000.00 );"; - stmt.executeUpdate(sql); - - sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " + - "VALUES (4, 'Mark', 25, 'Rich-Mond', 65000.00 );"; - stmt.executeUpdate(sql); - } - catch (Throwable e) { - DTThrowable.rethrow(e); - } - - } - - private void cleanTable() - { - String sql = "delete from " + dbloader.tableName; - try { - Statement stmt = dbloader.getConnection().createStatement(); - stmt.executeUpdate(sql); - logger.debug("Table deleted successfully..."); - } catch (SQLException e) { - DTThrowable.rethrow(e); - } - } - - @Override - protected void finished(Description description) - { - cleanTable(); - } - } - - @Rule - public TestMeta testMeta = new TestMeta(); - - @Test - public void testMysqlDBLookup() throws Exception - { - CountDownLatch latch = new CountDownLatch(1); - - ArrayList<String> lookupKeys = new ArrayList<String>(); - lookupKeys.add("ID"); - ArrayList<String> includeKeys = new ArrayList<String>(); - includeKeys.add("NAME"); - includeKeys.add("AGE"); - includeKeys.add("ADDRESS"); - testMeta.dbloader.setFields(lookupKeys, includeKeys); - - latch.await(1000, TimeUnit.MILLISECONDS); - - ArrayList<Object> keys = new ArrayList<Object>(); - keys.add("4"); - - ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys); - - Assert.assertEquals("NAME", "Mark", columnInfo.get(0).toString().trim()); - Assert.assertEquals("AGE", 25, columnInfo.get(1)); - Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(2).toString().trim()); - } - - @Test - public void testMysqlDBLookupIncludeAllKeys() throws Exception - { - CountDownLatch latch = new CountDownLatch(1); - - ArrayList<String> lookupKeys = new ArrayList<String>(); - lookupKeys.add("ID"); - ArrayList<String> includeKeys = new ArrayList<String>(); - testMeta.dbloader.setFields(lookupKeys, includeKeys); - - latch.await(1000, TimeUnit.MILLISECONDS); - - ArrayList<Object> keys = new ArrayList<Object>(); - keys.add("4"); - - ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys); - - Assert.assertEquals("ID", 4, columnInfo.get(0)); - Assert.assertEquals("NAME", "Mark", columnInfo.get(1).toString().trim()); - Assert.assertEquals("AGE", 25, columnInfo.get(2)); - Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(3).toString().trim()); - Assert.assertEquals("SALARY", 65000.0, columnInfo.get(4)); - } - - @Test - public void testMysqlDBQuery() throws Exception - { - CountDownLatch latch = new CountDownLatch(1); - - testMeta.dbloader.setQueryStmt("Select id, name from " + testMeta.dbloader.getTableName() + " where AGE = ? and ADDRESS = ?"); - - latch.await(1000, TimeUnit.MILLISECONDS); - - ArrayList<Object> keys = new ArrayList<Object>(); - keys.add("25"); - keys.add("Texas"); - - ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys); - - Assert.assertEquals("ID", 2, columnInfo.get(0)); - Assert.assertEquals("NAME", "Allen", columnInfo.get(1).toString().trim()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java deleted file mode 100644 index 845fe80..0000000 --- a/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java +++ /dev/null @@ -1,152 +0,0 @@ -package com.datatorrent.contrib.enrichment; - -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.TestUtils; -import com.google.common.collect.Maps; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import org.apache.commons.collections.CollectionUtils; -import org.junit.Assert; -import org.junit.Test; - -public class MapEnrichmentOperatorTest -{ - @Test - public void includeAllKeys() - { - MapEnrichmentOperator oper = new MapEnrichmentOperator(); - oper.setStore(new MemoryStore()); - oper.setLookupFieldsStr("In1"); - oper.setup(null); - - CollectorTestSink sink = new CollectorTestSink(); - TestUtils.setSink(oper.output, sink); - - Map<String, Object> inMap = Maps.newHashMap(); - inMap.put("In1", "Value1"); - inMap.put("In2", "Value2"); - - oper.beginWindow(1); - oper.input.process(inMap); - oper.endWindow(); - - Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size()); - Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, C=Val_C, In2=Value2, In1=Value3}", sink.collectedTuples.get(0).toString()); - } - - @Test - public void includeSelectedKeys() - { - MapEnrichmentOperator oper = new MapEnrichmentOperator(); - oper.setStore(new MemoryStore()); - oper.setLookupFieldsStr("In1"); - oper.setIncludeFieldsStr("A,B"); - oper.setup(null); - - CollectorTestSink sink = new CollectorTestSink(); - TestUtils.setSink(oper.output, sink); - - Map<String, Object> inMap = Maps.newHashMap(); - inMap.put("In1", "Value1"); - inMap.put("In2", "Value2"); - - oper.beginWindow(1); - oper.input.process(inMap); - oper.endWindow(); - - Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size()); - Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, In2=Value2, In1=Value1}", sink.collectedTuples.get(0).toString()); - } - - private static class MemoryStore implements EnrichmentBackup - { - static Map<String, Map> returnData = Maps.newHashMap(); - private List<String> includeFields; - static { - Map<String, String> map = Maps.newHashMap(); - map.put("A", "Val_A"); - map.put("B", "Val_B"); - map.put("C", "Val_C"); - map.put("In1", "Value3"); - returnData.put("Value1", map); - - map = Maps.newHashMap(); - map.put("A", "Val_A_1"); - map.put("B", "Val_B_1"); - map.put("C", "Val_C"); - map.put("In1", "Value3"); - returnData.put("Value2", map); - } - - @Override public Map<Object, Object> loadInitialData() - { - return null; - } - - @Override public Object get(Object key) - { - List<String> keyList = (List<String>)key; - Map<String, String> keyValue = returnData.get(keyList.get(0)); - ArrayList<Object> lst = new ArrayList<Object>(); - if(CollectionUtils.isEmpty(includeFields)) { - if(includeFields == null) - includeFields = new ArrayList<String>(); - for (Map.Entry<String, String> e : keyValue.entrySet()) { - includeFields.add(e.getKey()); - } - } - for(String field : includeFields) { - lst.add(keyValue.get(field)); - } - return lst; - } - - @Override public List<Object> getAll(List<Object> keys) - { - return null; - } - - @Override public void put(Object key, Object value) - { - - } - - @Override public void putAll(Map<Object, Object> m) - { - - } - - @Override public void remove(Object key) - { - - } - - @Override public void connect() throws IOException - { - - } - - @Override public void disconnect() throws IOException - { - - } - - @Override public boolean isConnected() - { - return false; - } - - @Override public void setFields(List<String> lookupFields, List<String> includeFields) - { - this.includeFields = includeFields; - - } - - @Override - public boolean needRefresh() { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/contrib/src/test/resources/productmapping.txt ---------------------------------------------------------------------- diff --git a/contrib/src/test/resources/productmapping.txt b/contrib/src/test/resources/productmapping.txt new file mode 100755 index 0000000..ece99ff --- /dev/null +++ b/contrib/src/test/resources/productmapping.txt @@ -0,0 +1,100 @@ +{"productCategory": 5, "productId": 0} +{"productCategory": 4, "productId": 1} +{"productCategory": 5, "productId": 2} +{"productCategory": 5, "productId": 3} +{"productCategory": 5, "productId": 4} +{"productCategory": 1, "productId": 5} +{"productCategory": 2, "productId": 6} +{"productCategory": 4, "productId": 7} +{"productCategory": 2, "productId": 8} +{"productCategory": 3, "productId": 9} +{"productCategory": 1, "productId": 10} +{"productCategory": 5, "productId": 11} +{"productCategory": 5, "productId": 12} +{"productCategory": 1, "productId": 13} +{"productCategory": 1, "productId": 14} +{"productCategory": 2, "productId": 15} +{"productCategory": 3, "productId": 16} +{"productCategory": 5, "productId": 17} +{"productCategory": 2, "productId": 18} +{"productCategory": 2, "productId": 19} +{"productCategory": 2, "productId": 20} +{"productCategory": 3, "productId": 21} +{"productCategory": 2, "productId": 22} +{"productCategory": 5, "productId": 23} +{"productCategory": 4, "productId": 24} +{"productCategory": 1, "productId": 25} +{"productCategory": 3, "productId": 26} +{"productCategory": 3, "productId": 27} +{"productCategory": 3, "productId": 28} +{"productCategory": 5, "productId": 29} +{"productCategory": 2, "productId": 30} +{"productCategory": 3, "productId": 31} +{"productCategory": 3, "productId": 32} +{"productCategory": 3, "productId": 33} +{"productCategory": 1, "productId": 34} +{"productCategory": 3, "productId": 35} +{"productCategory": 2, "productId": 36} +{"productCategory": 1, "productId": 37} +{"productCategory": 3, "productId": 38} +{"productCategory": 2, "productId": 39} +{"productCategory": 1, "productId": 40} +{"productCategory": 5, "productId": 41} +{"productCategory": 3, "productId": 42} +{"productCategory": 5, "productId": 43} +{"productCategory": 2, "productId": 44} +{"productCategory": 4, "productId": 45} +{"productCategory": 5, "productId": 46} +{"productCategory": 2, "productId": 47} +{"productCategory": 3, "productId": 48} +{"productCategory": 5, "productId": 49} +{"productCategory": 5, "productId": 50} +{"productCategory": 4, "productId": 51} +{"productCategory": 5, "productId": 52} +{"productCategory": 1, "productId": 53} +{"productCategory": 5, "productId": 54} +{"productCategory": 4, "productId": 55} +{"productCategory": 4, "productId": 56} +{"productCategory": 2, "productId": 57} +{"productCategory": 4, "productId": 58} +{"productCategory": 4, "productId": 59} +{"productCategory": 4, "productId": 60} +{"productCategory": 1, "productId": 61} +{"productCategory": 2, "productId": 62} +{"productCategory": 3, "productId": 63} +{"productCategory": 5, "productId": 64} +{"productCategory": 1, "productId": 65} +{"productCategory": 5, "productId": 66} +{"productCategory": 5, "productId": 67} +{"productCategory": 2, "productId": 68} +{"productCategory": 3, "productId": 69} +{"productCategory": 3, "productId": 70} +{"productCategory": 2, "productId": 71} +{"productCategory": 3, "productId": 72} +{"productCategory": 4, "productId": 73} +{"productCategory": 2, "productId": 74} +{"productCategory": 3, "productId": 75} +{"productCategory": 3, "productId": 76} +{"productCategory": 4, "productId": 77} +{"productCategory": 5, "productId": 78} +{"productCategory": 4, "productId": 79} +{"productCategory": 1, "productId": 80} +{"productCategory": 1, "productId": 81} +{"productCategory": 1, "productId": 82} +{"productCategory": 3, "productId": 83} +{"productCategory": 1, "productId": 84} +{"productCategory": 5, "productId": 85} +{"productCategory": 3, "productId": 86} +{"productCategory": 4, "productId": 87} +{"productCategory": 1, "productId": 88} +{"productCategory": 5, "productId": 89} +{"productCategory": 3, "productId": 90} +{"productCategory": 5, "productId": 91} +{"productCategory": 2, "productId": 92} +{"productCategory": 2, "productId": 93} +{"productCategory": 3, "productId": 94} +{"productCategory": 1, "productId": 95} +{"productCategory": 1, "productId": 96} +{"productCategory": 5, "productId": 97} +{"productCategory": 3, "productId": 98} +{"productCategory": 5, "productId": 99} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9600eddd/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java b/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java index e07bd04..d70ca0c 100644 --- a/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java +++ b/library/src/main/java/com/datatorrent/lib/util/FieldInfo.java @@ -20,6 +20,8 @@ package com.datatorrent.lib.util; import javax.validation.constraints.NotNull; +import org.apache.commons.lang3.ClassUtils; + @SuppressWarnings("rawtypes") /** * @since 3.3.0 @@ -114,7 +116,7 @@ public class FieldInfo } public static enum SupportType { - BOOLEAN(Boolean.class), SHORT(Short.class), INTEGER(Integer.class), LONG(Long.class), FLOAT(Float.class), DOUBLE(Double.class), STRING(String.class); + BOOLEAN(Boolean.class), SHORT(Short.class), INTEGER(Integer.class), LONG(Long.class), FLOAT(Float.class), DOUBLE(Double.class), STRING(String.class), OBJECT(Object.class); private Class javaType; @@ -127,6 +129,17 @@ public class FieldInfo { return javaType; } + + public static SupportType getFromJavaType(Class type) + { + for (SupportType supportType : SupportType.values()) { + if (supportType.getJavaType() == ClassUtils.primitiveToWrapper(type)) { + return supportType; + } + } + + return OBJECT; + } } }