http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/CountFunction.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/CountFunction.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/CountFunction.java new file mode 100644 index 0000000..ac83885 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/CountFunction.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.function; + +import java.util.ArrayList; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang.StringUtils; + +/** + * An implementation of function index that implements sql count function semantic. <br> + * <p> + * Counts number of values of given column and returns count of non null values in column. + * e.g : sql => SELECT COUNT(column_name) FROM table_name. <br> + * <br> + * <b> Properties : </b> <br> + * <b> column : </b> column name for values count. <br> + * <b> alias : </b> Alias name for aggregate output. <br> + * @displayName Count Function + * @category Stream Manipulators + * @tags sql count + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class CountFunction extends FunctionIndex +{ + /** + * @param column column for values count, must be non null. + * @param alias Alias name for aggregate output. + */ + public CountFunction(@NotNull String column, String alias) + { + super(column, alias); + } + + /** + * Count number of values of given column. + * @return Count of non null values in column. + */ + @Override + public Object compute(ArrayList<Map<String, Object>> rows) throws Exception + { + if (column.equals("*")) { + return rows.size(); + } + long count = 0; + for (Map<String, Object> row : rows) { + if (row.containsKey(column) && (row.get(column) != null)) { + count++; + } + } + return count; + } + + /** + * Aggregate output name. + * @return name string. + */ + @Override + protected String aggregateName() + { + if (!StringUtils.isEmpty(alias)) { + return alias; + } + return "COUNT(" + column + ")"; + } + +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FirstLastFunction.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FirstLastFunction.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FirstLastFunction.java new file mode 100644 index 0000000..296e449 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FirstLastFunction.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.function; + +import java.util.ArrayList; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang.StringUtils; + +/** + * An implementation of function index that implements sql first,last function semantic. <br> + * <p> + * e.g : sql => SELECT FIRST/LAST(column_name) FROM table_name. <br> + * <br> + * <b> Properties : </b> <br> + * <b> column : </b> column name for first/last value. <br> + * <b> alias : </b> Alias name for output. <br> + * <b> isFirst : </b> return first value if true. + * @displayName First Last Function + * @category Stream Manipulators + * @tags sql first, sql last + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class FirstLastFunction extends FunctionIndex +{ + /** + * return first value if true. + */ + private boolean isFirst; + + /** + * @param column column name for first/last value. + * @param alias Alias name for output. + * @param isFirst return first value if true. + */ + public FirstLastFunction(@NotNull String column, String alias, boolean isLast) + { + super(column, alias); + isFirst = !isLast; + } + + /** + * Get first/last non null value for column. + */ + @Override + public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception + { + if (rows.size() == 0) { + return null; + } + if (isFirst) { + for (int i = 0; i < rows.size(); i++) { + if (rows.get(i).get(column) != null) { + return rows.get(i).get(column); + } + } + } else { + for (int i = (rows.size() - 1); i >= 0; i--) { + if (rows.get(i).get(column) != null) { + return rows.get(i).get(column); + } + } + } + return null; + } + + /** + * Aggregate output name. + * @return name string. + */ + @Override + protected String aggregateName() + { + if (!StringUtils.isEmpty(alias)) { + return alias; + } + if (isFirst) { + return "FIRST(" + column + ")"; + } + return "LAST(" + column + ")"; + } + + public boolean isFirst() + { + return isFirst; + } + + public void setFirst(boolean isFirst) + { + this.isFirst = isFirst; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FunctionIndex.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FunctionIndex.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FunctionIndex.java new file mode 100644 index 0000000..d307ad2 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FunctionIndex.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.function; + +import java.util.ArrayList; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +/** + * A base class for select aggregate function implementation. Subclasses should provide the + implementation for aggregate compute functions. + * <p> + * <br> + * <b>Properties : </b> <br> + * <b>column : </b> Column name for aggregation. + * <b>alias : </b> Output value alias name. + * @displayName Function Index + * @category Stream Manipulators + * @tags sql aggregate + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public abstract class FunctionIndex +{ + /** + * Column name. + */ + @NotNull + protected String column; + + /** + * Alias name. + */ + protected String alias; + + /** + * @param column Column name for aggregation. + * @param alias Output value alias name. + */ + public FunctionIndex(@NotNull String column, String alias) + { + this.column = column; + this.alias = alias; + } + + /** + * Aggregate compute function, implementation in sub class. + * @param rows Tuple list over application window. + * @return aggregate result object. + */ + public abstract Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception; + + /** + * Get aggregate output value name. + * @return name string. + */ + protected abstract String aggregateName(); + + /** + * Apply compute function to given rows and store result in collect by output value name. + * @param rows Tuple list over application window. + */ + public void filter(ArrayList<Map<String, Object>> rows, Map<String, Object> collect) throws Exception + { + if (rows == null) { + return; + } + String name = column; + if (alias != null) { + name = alias; + } + if (name == null) { + name = aggregateName(); + } + collect.put(name, compute(rows)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/MaxMinFunction.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/MaxMinFunction.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/MaxMinFunction.java new file mode 100644 index 0000000..a602614 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/MaxMinFunction.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.function; + +import java.util.ArrayList; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang.StringUtils; + +/** + * An implementation of function index that implements sql max and sql min function semantic. <br> + * <p> + * e.g : sql => SELECT MAX/MIN(column_name) FROM table_name. <br> + * <br> + * <b> Properties : </b> <br> + * <b> column : </b> column name for values max/min computation. <br> + * <b> alias : </b> Alias name for output value. <br> + * <b> isMax : </b> Flag to indicate max/min compute value. <br> + * @displayName Max Min Function + * @category Stream Manipulators + * @tags sql max, sql min + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class MaxMinFunction extends FunctionIndex +{ + /** + * Flag to indicate max/min compute value, compute max if true. + */ + private boolean isMax = true; + + /** + * @param column column name for values max/min computation. <br> + * @param alias Alias name for output. <br> + * @param isMax Flag to indicate max/min compute value. <br> + */ + public MaxMinFunction(@NotNull String column, String alias, boolean isMin) + { + super(column, alias); + isMax = !isMin; + } + + /** + * Compute max/min for given column. + * @return max/min value. + */ + @Override + public Object compute(ArrayList<Map<String, Object>> rows) throws Exception + { + double minMax = 0.0; + for (Map<String, Object> row : rows) { + double value = ((Number)row.get(column)).doubleValue(); + if ((isMax && (minMax < value)) || (!isMax && (minMax > value))) { + minMax = value; + } + } + return minMax; + } + + /** + * Aggregate output name. + * @return name string. + */ + @Override + protected String aggregateName() + { + if (!StringUtils.isEmpty(alias)) { + return alias; + } + if (isMax) { + return "MAX(" + column + ")"; + } + return "MIN(" + column + ")"; + } + + public boolean isMax() + { + return isMax; + } + + public void setMax(boolean isMax) + { + this.isMax = isMax; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/SumFunction.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/SumFunction.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/SumFunction.java new file mode 100644 index 0000000..ef4fb4b --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/SumFunction.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.function; + +import java.util.ArrayList; +import java.util.Map; + +import javax.validation.constraints.NotNull; + + + +/** + * <p> An implementation of sql sum function. </p> + * <p> + * @displayName Sum Function + * @category Stream Manipulators + * @tags sql sum, aggregate + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class SumFunction extends FunctionIndex +{ + public SumFunction(String column, String alias) throws Exception + { + super(column, alias); + } + + @Override + public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception + { + Double result = 0.0; + for (Map<String, Object> row : rows) { + if (!row.containsKey(column)) { + continue; + } + result += ((Number)row.get(column)).doubleValue(); + } + return result; + } + + @Override + protected String aggregateName() + { + return "Sum(" + column; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/BinaryExpression.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/BinaryExpression.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/BinaryExpression.java new file mode 100644 index 0000000..4de58c1 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/BinaryExpression.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.index; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.streamquery.index.Index; + +/** + * Abstract class to filter row by binary expression index. + * <p> + * Sub class will implement filter/getExpressionName functions. + * @displayName Binary Expression + * @category Stream Manipulators + * @tags alias + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public abstract class BinaryExpression implements Index +{ + /** + * Left column name argument for expression. + */ + @NotNull + protected String left; + + /** + * Right column name argument for expression. + */ + @NotNull + protected String right; + + /** + * Alias name for output field. + */ + protected String alias; + + /** + * @param left column name argument for expression. + * @param right column name argument for expression. + * @param alias name for output field. + */ + public BinaryExpression(@NotNull String left, @NotNull String right, String alias) + { + this.left = left; + this.right = right; + } + + public String getAlias() + { + return alias; + } + + public void setAlias(String alias) + { + this.alias = alias; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/MidIndex.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/MidIndex.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/MidIndex.java new file mode 100644 index 0000000..c165d89 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/MidIndex.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.index; + +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.streamquery.index.ColumnIndex; + +/** + * <p>An implementation of Column Index that implements filter method based on mid index. </p> + * <p> + * @displayName Mid Index + * @category Stream Manipulators + * @tags index + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class MidIndex extends ColumnIndex +{ + private int start; + private int length = 0; + + public MidIndex(@NotNull String column, String alias, int start) + { + super(column, alias); + assert (start >= 0); + this.start = start; + } + + @Override + public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) + { + if (!row.containsKey(column)) { + return; + } + if (!(row.get(column) instanceof String)) { + assert (false); + } + String name = getColumn(); + if (alias != null) { + name = alias; + } + + int endIndex = start + length; + if ((length == 0) || (endIndex > ((String)row.get(column)).length())) { + collect.put(name, row.get(column)); + } else { + collect.put(name, ((String)row.get(column)).substring(start, endIndex)); + } + } + + public int getLength() + { + return length; + } + + public void setLength(int length) + { + assert (length > 0); + this.length = length; + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/NegateExpression.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/NegateExpression.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/NegateExpression.java new file mode 100644 index 0000000..0a6f64d --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/NegateExpression.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.index; + +import java.util.Map; + +import javax.validation.constraints.Null; + + +/** + * An implementation of Unary Expression that implements filter method using negate metric sql semantic on column value. + * <p> + * @displayName Negate Expression + * @category Stream Manipulators + * @tags expression, alias + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class NegateExpression extends UnaryExpression +{ + + /** + * @param column Name of column value to be negated. + */ + public NegateExpression(@Null String column, String alias) + { + super(column, alias); + if (this.alias == null) { + this.alias = "NEGATE(" + column + ")"; + } + } + + /* (non-Javadoc) + * @see com.datatorrent.lib.streamquery.index.Index#filter(java.util.Map, java.util.Map) + */ + @Override + public void filter(Map<String, Object> row, Map<String, Object> collect) + { + if (!row.containsKey(column)) { + return; + } + collect.put(alias, -((Number)row.get(column)).doubleValue()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/RoundDoubleIndex.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/RoundDoubleIndex.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/RoundDoubleIndex.java new file mode 100644 index 0000000..495063f --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/RoundDoubleIndex.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.index; + +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.streamquery.index.ColumnIndex; + +/** + * <p>An implementation of column index that implements filter method using Round Double Index. </p> + * + * @displayName Round Double Index + * @category Stream Manipulators + * @tags alias, maths + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class RoundDoubleIndex extends ColumnIndex +{ + private int rounder; + public RoundDoubleIndex(@NotNull String column, String alias, int numDecimals) + { + super(column, alias); + rounder = 1; + if (numDecimals > 0) { + rounder = (int)Math.pow(10, numDecimals); + } + } + + @Override + public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) + { + if (!row.containsKey(column)) { + return; + } + double value = (Double)row.get(column); + value = Math.round(value * rounder) / rounder; + String name = getColumn(); + if (alias != null) { + name = alias; + } + collect.put(name, value); + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringCaseIndex.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringCaseIndex.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringCaseIndex.java new file mode 100644 index 0000000..31c9468 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringCaseIndex.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.index; + +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.streamquery.index.ColumnIndex; + +/** + * <p>An implementation of Column Index that implements filter method using case of a string index. </p> + * + * @displayName String Case Index + * @category Stream Manipulators + * @tags alias + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class StringCaseIndex extends ColumnIndex +{ + private boolean toUpperCase = true; + public StringCaseIndex(@NotNull String column, String alias, boolean toLowerCase) + { + super(column, alias); + toUpperCase = !toLowerCase; + } + + @Override + public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) + { + if (!row.containsKey(column)) { + return; + } + if (!(row.get(column) instanceof String)) { + assert (false); + } + + String name = getColumn(); + if (alias != null) { + name = alias; + } + if (toUpperCase) { + collect.put(name, ((String)row.get(column)).toUpperCase()); + } else { + collect.put(name, ((String)row.get(column)).toLowerCase()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringLenIndex.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringLenIndex.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringLenIndex.java new file mode 100644 index 0000000..f764c9e --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringLenIndex.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.index; + +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.streamquery.index.ColumnIndex; + +/** + * <p>An implementation of Column Index that implements filter method using length of a string Index. </p> + * <p> + * @displayName String Length Index + * @category Stream Manipulators + * @tags alias + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class StringLenIndex extends ColumnIndex +{ + public StringLenIndex(@NotNull String column, String alias) + { + super(column, alias); + } + + @Override + public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) + { + if (!row.containsKey(column)) { + return; + } + if (!(row.get(column) instanceof String)) { + assert (false); + } + + String name = getColumn(); + if (alias != null) { + name = alias; + } + collect.put(name, ((String)row.get(column)).length()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/SumExpression.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/SumExpression.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/SumExpression.java new file mode 100644 index 0000000..91d4ec7 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/SumExpression.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.index; + +import java.util.Map; + +import javax.validation.constraints.NotNull; + + +/** + * Implements sum on column index. + * <p> + * Select index class for implementing sum column index. + * @displayName Sum Expression + * @category Stream Manipulators + * @tags sum + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public class SumExpression extends BinaryExpression +{ + + /** + * @param left column name argument for expression. + * @param right column name argument for expression. + * @param alias name for output field. + */ + public SumExpression(@NotNull String left, @NotNull String right, String alias) + { + super(left, right, alias); + if (this.alias == null) { + this.alias = "SUM(" + left + "," + right + ")"; + } + } + + /* sum column values. + * @see com.datatorrent.lib.streamquery.index.Index#filter(java.util.Map, java.util.Map) + */ + @Override + public void filter(Map<String, Object> row, Map<String, Object> collect) + { + if (!row.containsKey(left) || !row.containsKey(right)) { + return; + } + collect.put(alias, ((Number)row.get(left)).doubleValue() + ((Number)row.get(right)).doubleValue()); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/UnaryExpression.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/UnaryExpression.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/UnaryExpression.java new file mode 100644 index 0000000..04d5fc6 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/UnaryExpression.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.streamquery.index; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.streamquery.index.Index; + +/** + * A base implementation of an index that filters row by unary expression. Subclasses should provide the + implementation of filter/getExpressionName functions. + * <p> + * Sub class will implement filter/getExpressionName functions. + * @displayName Unary Expression + * @category Stream Manipulators + * @tags unary, alias + * @since 0.3.4 + * @deprecated + */ +@Deprecated +public abstract class UnaryExpression implements Index +{ + /** + * Column name argument for unary expression. + */ + @NotNull + protected String column; + + /** + * Alias name for output field. + */ + protected String alias; + + /** + * @param column name argument for unary expression. + * @param alias name for output field. + */ + public UnaryExpression(@NotNull String column, String alias) + { + this.column = column; + } + + public String getColumn() + { + return column; + } + + public void setColumn(String column) + { + this.column = column; + } + + public String getAlias() + { + return alias; + } + + public void setAlias(String alias) + { + this.alias = alias; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/package-info.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/package-info.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/package-info.java new file mode 100644 index 0000000..358c0bb --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Library of operators for streaming query language. + */ [email protected] +package org.apache.apex.malhar.lib.misc.streamquery; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcherTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcherTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcherTest.java new file mode 100644 index 0000000..1f669bc --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcherTest.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +/** + * + * Functional tests for {@link org.apache.apex.malhar.contrib.misc.algo.AbstractStreamPatternMatcher}<p> + * @deprecated + */ + +import java.util.Random; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.testbench.CollectorTestSink; + + +@Deprecated +public class AbstractStreamPatternMatcherTest +{ + + public static class StreamPatternMatcher<T> extends AbstractStreamPatternMatcher<T> + { + @Override + public void processPatternFound() + { + outputPort.emit(getPattern().getStates()); + } + + public transient DefaultOutputPort<T[]> outputPort = new DefaultOutputPort<T[]>(); + } + + private StreamPatternMatcher<Integer> streamPatternMatcher; + private AbstractStreamPatternMatcher.Pattern<Integer> pattern; + private Integer[] inputPattern; + private CollectorTestSink<Object> sink; + + @Before + public void setup() + { + streamPatternMatcher = new StreamPatternMatcher<Integer>(); + sink = new CollectorTestSink<Object>(); + streamPatternMatcher.outputPort.setSink(sink); + } + + @After + public void cleanup() + { + streamPatternMatcher.teardown(); + sink.collectedTuples.clear(); + } + + @Test + public void test() throws Exception + { + inputPattern = new Integer[]{0, 1, 0, 1, 2}; + pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern); + streamPatternMatcher.setPattern(pattern); + streamPatternMatcher.setup(null); + streamPatternMatcher.beginWindow(0); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.inputPort.process(1); + streamPatternMatcher.inputPort.process(1); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.inputPort.process(1); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.inputPort.process(1); + streamPatternMatcher.inputPort.process(2); + streamPatternMatcher.inputPort.process(1); + streamPatternMatcher.endWindow(); + Assert.assertEquals("The number of tuples emitted is one", 1, sink.collectedTuples.size()); + Assert.assertEquals("Matching the output pattern with input pattern", inputPattern, sink.collectedTuples.get(0)); + } + + @Test + public void testSimplePattern() throws Exception + { + inputPattern = new Integer[]{0, 0}; + pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern); + streamPatternMatcher.setPattern(pattern); + streamPatternMatcher.setup(null); + streamPatternMatcher.beginWindow(0); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.inputPort.process(1); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.endWindow(); + Assert.assertEquals("The number of tuples emitted are three", 3, sink.collectedTuples.size()); + for (Object object : sink.collectedTuples) { + Assert.assertEquals("Matching the output pattern with input pattern", inputPattern, object); + } + } + + @Test + public void testPatternWithSingleState() throws Exception + { + inputPattern = new Integer[]{0}; + pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern); + streamPatternMatcher.setPattern(pattern); + streamPatternMatcher.setup(null); + streamPatternMatcher.beginWindow(0); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.inputPort.process(1); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.inputPort.process(0); + streamPatternMatcher.endWindow(); + Assert.assertEquals("The number of tuples emitted are three", 5, sink.collectedTuples.size()); + for (Object object : sink.collectedTuples) { + Assert.assertEquals("Matching the output pattern with input pattern", inputPattern, object); + } + } + + @Test + public void testAutoGeneratedPattern() throws Exception + { + Random random = new Random(); + int patternSize = 15; + inputPattern = new Integer[patternSize]; + int max = 10; + int min = 1; + int primeNumber = 5; + for (int i = 0; i < patternSize; i++) { + inputPattern[i] = (min + random.nextInt(max)); + } + pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern); + streamPatternMatcher.setPattern(pattern); + streamPatternMatcher.setup(null); + streamPatternMatcher.beginWindow(0); + int numberOfIterations = 20; + for (int i = 0; i < patternSize; i++) { + for (int j = 0; j <= i; j++) { + streamPatternMatcher.inputPort.process(inputPattern[j]); + } + for (int k = 0; k < numberOfIterations; k++) { + streamPatternMatcher.inputPort.process(max + min + random.nextInt(max)); + } + if (i % primeNumber == 0) { + for (int j = 0; j < patternSize; j++) { + streamPatternMatcher.inputPort.process(inputPattern[j]); + } + } + } + streamPatternMatcher.endWindow(); + Assert.assertEquals("The number of tuples emitted ", 1 + patternSize / primeNumber, sink.collectedTuples.size()); + for (Object output : sink.collectedTuples) { + Assert.assertEquals("Matching the output pattern with input pattern", inputPattern, output); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMapTest.java new file mode 100644 index 0000000..1356004 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMapTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link AllAfterMatchMapTest} + * <p> + */ +@Deprecated +public class AllAfterMatchMapTest +{ + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new AllAfterMatchMap<String, Integer>()); + testNodeProcessingSchema(new AllAfterMatchMap<String, Double>()); + testNodeProcessingSchema(new AllAfterMatchMap<String, Float>()); + testNodeProcessingSchema(new AllAfterMatchMap<String, Short>()); + testNodeProcessingSchema(new AllAfterMatchMap<String, Long>()); + } + + @SuppressWarnings({ "unchecked", "rawtypes", "unchecked" }) + public void testNodeProcessingSchema(AllAfterMatchMap oper) + { + CollectorTestSink allSink = new CollectorTestSink(); + oper.allafter.setSink(allSink); + oper.setKey("a"); + oper.setValue(3.0); + oper.setTypeEQ(); + + oper.beginWindow(0); + HashMap<String, Number> input = new HashMap<String, Number>(); + input.put("a", 2); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 3); + oper.data.process(input); + + input.clear(); + input.put("b", 6); + oper.data.process(input); + + input.clear(); + input.put("c", 9); + oper.data.process(input); + + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 3, + allSink.collectedTuples.size()); + for (Object o : allSink.collectedTuples) { + for (Map.Entry<String, Number> e : ((HashMap<String, Number>)o).entrySet()) { + if (e.getKey().equals("a")) { + Assert.assertEquals("emitted value for 'a' was ", new Double(3), new Double(e.getValue().doubleValue())); + } else if (e.getKey().equals("b")) { + Assert.assertEquals("emitted tuple for 'b' was ", new Double(6), new Double(e.getValue().doubleValue())); + } else if (e.getKey().equals("c")) { + Assert.assertEquals("emitted tuple for 'c' was ", new Double(9), new Double(e.getValue().doubleValue())); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMapTest.java new file mode 100644 index 0000000..c082f83 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMapTest.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; + +import org.junit.Test; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link DistinctMap}<p> + * + */ +@Deprecated +public class DistinctMapTest +{ + /** + * Test node logic emits correct results + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + DistinctMap<String, Number> oper = new DistinctMap<String, Number>(); + + CollectorTestSink sortSink = new CollectorTestSink(); + oper.distinct.setSink(sortSink); + + + oper.beginWindow(0); + HashMap<String, Number> input = new HashMap<String, Number>(); + + input.put("a", 2); + oper.data.process(input); + input.clear(); + input.put("a", 2); + oper.data.process(input); + + input.clear(); + input.put("a", 1000); + oper.data.process(input); + + input.clear(); + input.put("a", 5); + oper.data.process(input); + + input.clear(); + input.put("a", 2); + input.put("b", 33); + oper.data.process(input); + + input.clear(); + input.put("a", 33); + input.put("b", 34); + oper.data.process(input); + + input.clear(); + input.put("b", 34); + oper.data.process(input); + + input.clear(); + input.put("b", 6); + input.put("a", 2); + oper.data.process(input); + input.clear(); + input.put("c", 9); + oper.data.process(input); + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 8, sortSink.collectedTuples.size()); + int aval = 0; + int bval = 0; + int cval = 0; + for (Object o: sortSink.collectedTuples) { + for (Map.Entry<String, Integer> e: ((HashMap<String, Integer>)o).entrySet()) { + String key = e.getKey(); + if (key.equals("a")) { + aval += e.getValue(); + } else if (key.equals("b")) { + bval += e.getValue(); + } else if (key.equals("c")) { + cval += e.getValue(); + } + } + } + Assert.assertEquals("Total for key \"a\" ", 1040, aval); + Assert.assertEquals("Total for key \"a\" ", 73, bval); + Assert.assertEquals("Total for key \"a\" ", 9, cval); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyValsTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyValsTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyValsTest.java new file mode 100644 index 0000000..be2f6ee --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyValsTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link FilterKeyVals}<p> + * + */ +@Deprecated +public class FilterKeyValsTest +{ + @SuppressWarnings("unchecked") + int getTotal(List<Object> list) + { + int ret = 0; + for (Object map: list) { + for (Map.Entry<String, Number> e: ((HashMap<String, Number>)map).entrySet()) { + ret += e.getValue().intValue(); + } + } + return ret; + } + + /** + * Test node logic emits correct results + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + FilterKeyVals<String,Number> oper = new FilterKeyVals<String,Number>(); + + CollectorTestSink sortSink = new CollectorTestSink(); + oper.filter.setSink(sortSink); + HashMap<String,Number> filter = new HashMap<String,Number>(); + filter.put("b",2); + oper.setKeyVals(filter); + oper.clearKeys(); + + filter.clear(); + filter.put("e", 200); + filter.put("f", 2); + filter.put("blah", 2); + oper.setKeyVals(filter); + filter.clear(); + filter.put("a", 2); + oper.setKeyVals(filter); + + oper.beginWindow(0); + HashMap<String, Number> input = new HashMap<String, Number>(); + input.put("a", 2); + input.put("b", 5); + input.put("c", 7); + input.put("d", 42); + input.put("e", 202); + input.put("e", 200); + input.put("f", 2); + oper.data.process(input); + Assert.assertEquals("number emitted tuples", 3, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 204, getTotal(sortSink.collectedTuples)); + sortSink.clear(); + + input.clear(); + input.put("a", 5); + oper.data.process(input); + Assert.assertEquals("number emitted tuples", 0, sortSink.collectedTuples.size()); + sortSink.clear(); + + input.clear(); + input.put("a", 2); + input.put("b", 33); + input.put("f", 2); + oper.data.process(input); + Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 4, getTotal(sortSink.collectedTuples)); + sortSink.clear(); + + input.clear(); + input.put("b", 6); + input.put("a", 2); + input.put("j", 6); + input.put("e", 2); + input.put("dd", 6); + input.put("blah", 2); + input.put("another", 6); + input.put("notmakingit", 2); + oper.data.process(input); + Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 4, getTotal(sortSink.collectedTuples)); + sortSink.clear(); + + input.clear(); + input.put("c", 9); + oper.setInverse(true); + oper.data.process(input); + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 9, getTotal(sortSink.collectedTuples)); + + oper.endWindow(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMapTest.java new file mode 100644 index 0000000..a25c2df --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMapTest.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; + +import org.junit.Test; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * + * Functional tests for {@link FilterKeysHashMap}<p> + * @deprecated + */ +@Deprecated +public class FilterKeysHashMapTest +{ + @SuppressWarnings("unchecked") + int getTotal(Object o) + { + HashMap<String, HashMap<String, Number>> map = (HashMap<String, HashMap<String, Number>>)o; + int ret = 0; + for (Map.Entry<String, HashMap<String, Number>> e: map.entrySet()) { + for (Map.Entry<String, Number> e2: e.getValue().entrySet()) { + ret += e2.getValue().intValue(); + } + } + return ret; + } + + /** + * Test node logic emits correct results + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + FilterKeysHashMap<String, Number> oper = new FilterKeysHashMap<String, Number>(); + + CollectorTestSink sortSink = new CollectorTestSink(); + oper.filter.setSink(sortSink); + oper.setKey("b"); + oper.clearKeys(); + String[] keys = new String[3]; + keys[0] = "e"; + keys[1] = "f"; + keys[2] = "blah"; + oper.setKey("a"); + oper.setKeys(keys); + + oper.beginWindow(0); + HashMap<String, HashMap<String, Number>> inputA = new HashMap<String, HashMap<String, Number>>(); + HashMap<String, Number> input = new HashMap<String, Number>(); + HashMap<String, Number> input2 = new HashMap<String, Number>(); + + input.put("a", 2); + input.put("b", 5); + input.put("c", 7); + input.put("d", 42); + input.put("e", 200); + input.put("f", 2); + inputA.put("A", input); + oper.data.process(inputA); + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 204, getTotal(sortSink.collectedTuples.get(0))); + sortSink.clear(); + + input.clear(); + inputA.clear(); + input.put("a", 5); + inputA.put("A", input); + oper.data.process(inputA); + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 5, getTotal(sortSink.collectedTuples.get(0))); + sortSink.clear(); + + input.clear(); + inputA.clear(); + input.put("a", 2); + input.put("b", 33); + input.put("f", 2); + inputA.put("A", input); + oper.data.process(inputA); + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 4, getTotal(sortSink.collectedTuples.get(0))); + sortSink.clear(); + + input.clear(); + inputA.clear(); + input.put("b", 6); + input.put("a", 2); + input.put("j", 6); + input.put("e", 2); + input.put("dd", 6); + input.put("blah", 2); + input.put("another", 6); + input.put("notmakingit", 2); + inputA.put("A", input); + oper.data.process(inputA); + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 6, getTotal(sortSink.collectedTuples.get(0))); + sortSink.clear(); + + input.clear(); + inputA.clear(); + input.put("c", 9); + oper.setInverse(true); + inputA.put("A", input); + oper.data.process(inputA); + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 9, getTotal(sortSink.collectedTuples.get(0))); + sortSink.clear(); + + input.clear(); + input2.clear(); + inputA.clear(); + input.put("e", 2); // pass + input.put("c", 9); + input2.put("a", 5); // pass + input2.put("p", 8); + oper.setInverse(false); + inputA.put("A", input); + inputA.put("B", input2); + oper.data.process(inputA); + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 7, getTotal(sortSink.collectedTuples.get(0))); + sortSink.clear(); + + oper.endWindow(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMapTest.java new file mode 100644 index 0000000..74cebdd --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMapTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link FilterKeysMap}<p> + * + */ +@Deprecated +public class FilterKeysMapTest +{ + @SuppressWarnings("unchecked") + int getTotal(Object o) + { + HashMap<String, Number> map = (HashMap<String, Number>)o; + int ret = 0; + for (Map.Entry<String, Number> e: map.entrySet()) { + ret += e.getValue().intValue(); + } + return ret; + } + + /** + * Test node logic emits correct results + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + FilterKeysMap<String,Number> oper = new FilterKeysMap<String,Number>(); + + CollectorTestSink sortSink = new CollectorTestSink(); + oper.filter.setSink(sortSink); + oper.setKey("b"); + oper.clearKeys(); + String[] keys = new String[3]; + keys[0] = "e"; + keys[1] = "f"; + keys[2] = "blah"; + oper.setKey("a"); + oper.setKeys(keys); + + oper.beginWindow(0); + HashMap<String, Number> input = new HashMap<String, Number>(); + + input.put("a", 2); + input.put("b", 5); + input.put("c", 7); + input.put("d", 42); + input.put("e", 200); + input.put("f", 2); + oper.data.process(input); + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 204, getTotal(sortSink.collectedTuples.get(0))); + sortSink.clear(); + + input.clear(); + input.put("a", 5); + oper.data.process(input); + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 5, getTotal(sortSink.collectedTuples.get(0))); + sortSink.clear(); + + input.clear(); + input.put("a", 2); + input.put("b", 33); + input.put("f", 2); + oper.data.process(input); + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 4, getTotal(sortSink.collectedTuples.get(0))); + sortSink.clear(); + + input.clear(); + input.put("b", 6); + input.put("a", 2); + input.put("j", 6); + input.put("e", 2); + input.put("dd", 6); + input.put("blah", 2); + input.put("another", 6); + input.put("notmakingit", 2); + oper.data.process(input); + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 6, getTotal(sortSink.collectedTuples.get(0))); + sortSink.clear(); + + input.clear(); + input.put("c", 9); + oper.setInverse(true); + oper.data.process(input); + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("Total filtered value is ", 9, getTotal(sortSink.collectedTuples.get(0))); + + oper.endWindow(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMapTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMapTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMapTest.java new file mode 100644 index 0000000..d02ee27 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMapTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.testbench.CountAndLastTupleTestSink; + +/** + * @deprecated + * Functional tests for {@link FirstMatchMap}<p> + * + */ +@Deprecated +public class FirstMatchMapTest +{ + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new FirstMatchMap<String, Integer>()); + testNodeProcessingSchema(new FirstMatchMap<String, Double>()); + testNodeProcessingSchema(new FirstMatchMap<String, Float>()); + testNodeProcessingSchema(new FirstMatchMap<String, Short>()); + testNodeProcessingSchema(new FirstMatchMap<String, Long>()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testNodeProcessingSchema(FirstMatchMap oper) + { + CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink(); + oper.first.setSink(matchSink); + oper.setKey("a"); + oper.setValue(3); + oper.setTypeEQ(); + + oper.beginWindow(0); + HashMap<String, Number> input = new HashMap<String, Number>(); + input.put("a", 4); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.put("a", 3); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 2); + oper.data.process(input); + input.clear(); + input.put("a", 4); + input.put("b", 21); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 4); + input.put("b", 20); + input.put("c", 5); + oper.data.process(input); + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 1, matchSink.count); + HashMap<String, Number> tuple = (HashMap<String, Number>)matchSink.tuple; + Number aval = tuple.get("a"); + Assert.assertEquals("Value of a was ", 3, aval.intValue()); + matchSink.clear(); + + oper.beginWindow(0); + input.clear(); + input.put("a", 2); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 5); + oper.data.process(input); + oper.endWindow(); + // There should be no emit as all tuples do not match + Assert.assertEquals("number emitted tuples", 0, matchSink.count); + matchSink.clear(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstNTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstNTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstNTest.java new file mode 100644 index 0000000..e6c3e7e --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstNTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link FirstN}<p> + */ +@Deprecated +public class FirstNTest +{ + private static Logger log = LoggerFactory.getLogger(FirstNTest.class); + + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new FirstN<String, Integer>()); + testNodeProcessingSchema(new FirstN<String, Double>()); + testNodeProcessingSchema(new FirstN<String, Float>()); + testNodeProcessingSchema(new FirstN<String, Short>()); + testNodeProcessingSchema(new FirstN<String, Long>()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testNodeProcessingSchema(FirstN oper) + { + CollectorTestSink sortSink = new CollectorTestSink(); + oper.first.setSink(sortSink); + oper.setN(3); + + oper.beginWindow(0); + HashMap<String, Number> input = new HashMap<String, Number>(); + + input.put("a", 2); + oper.data.process(input); + + input.clear(); + input.put("a", 20); + oper.data.process(input); + + input.clear(); + input.put("a", 1000); + oper.data.process(input); + + input.clear(); + input.put("a", 5); + oper.data.process(input); + + input.clear(); + input.put("a", 20); + input.put("b", 33); + oper.data.process(input); + + input.clear(); + input.put("a", 33); + input.put("b", 34); + oper.data.process(input); + + input.clear(); + input.put("b", 34); + input.put("a", 1001); + oper.data.process(input); + + input.clear(); + input.put("b", 6); + input.put("a", 1); + oper.data.process(input); + input.clear(); + input.put("c", 9); + oper.data.process(input); + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 7, sortSink.collectedTuples.size()); + int aval = 0; + int bval = 0; + int cval = 0; + for (Object o : sortSink.collectedTuples) { + for (Map.Entry<String, Number> e : ((HashMap<String, Number>)o).entrySet()) { + if (e.getKey().equals("a")) { + aval += e.getValue().intValue(); + } else if (e.getKey().equals("b")) { + bval += e.getValue().intValue(); + } else if (e.getKey().equals("c")) { + cval += e.getValue().intValue(); + } + } + } + Assert.assertEquals("Value of \"a\" was ", 1022, aval); + Assert.assertEquals("Value of \"a\" was ", 101, bval); + Assert.assertEquals("Value of \"a\" was ", 9, cval); + log.debug("Done testing round\n"); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatchTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatchTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatchTest.java new file mode 100644 index 0000000..51b8415 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatchTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; + +import org.junit.Assert; + +import org.junit.Test; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link FirstTillMatch}<p> + * + */ +@Deprecated +public class FirstTillMatchTest +{ + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new FirstTillMatch<String, Integer>()); + testNodeProcessingSchema(new FirstTillMatch<String, Double>()); + testNodeProcessingSchema(new FirstTillMatch<String, Float>()); + testNodeProcessingSchema(new FirstTillMatch<String, Short>()); + testNodeProcessingSchema(new FirstTillMatch<String, Long>()); + } + + @SuppressWarnings( {"unchecked", "rawtypes"}) + public void testNodeProcessingSchema(FirstTillMatch oper) + { + CollectorTestSink matchSink = new CollectorTestSink(); + oper.first.setSink(matchSink); + oper.setKey("a"); + oper.setValue(3); + oper.setTypeEQ(); + + oper.beginWindow(0); + HashMap<String, Number> input = new HashMap<String, Number>(); + input.put("a", 4); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 2); + oper.data.process(input); + input.put("a", 3); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 4); + input.put("b", 21); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 6); + input.put("b", 20); + input.put("c", 5); + oper.data.process(input); + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 2, matchSink.collectedTuples.size()); + int atotal = 0; + for (Object o: matchSink.collectedTuples) { + atotal += ((HashMap<String,Number>)o).get("a").intValue(); + } + Assert.assertEquals("Value of a was ", 6, atotal); + matchSink.clear(); + + oper.beginWindow(0); + input.clear(); + input.put("a", 2); + input.put("b", 20); + input.put("c", 1000); + oper.data.process(input); + input.clear(); + input.put("a", 5); + oper.data.process(input); + oper.endWindow(); + // There should be no emit as all tuples do not match + Assert.assertEquals("number emitted tuples", 2, matchSink.collectedTuples.size()); + atotal = 0; + for (Object o: matchSink.collectedTuples) { + atotal += ((HashMap<String,Number>)o).get("a").intValue(); + } + Assert.assertEquals("Value of a was ", 7, atotal); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDescTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDescTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDescTest.java new file mode 100644 index 0000000..cebd628 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDescTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.ArrayList; +import java.util.HashMap; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link InsertSortDesc}<p> + */ +@Deprecated +public class InsertSortDescTest +{ + private static Logger log = LoggerFactory.getLogger(InsertSortDescTest.class); + + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new InsertSortDesc<Integer>(), "Integer"); + testNodeProcessingSchema(new InsertSortDesc<Double>(), "Double"); + testNodeProcessingSchema(new InsertSortDesc<Float>(), "Float"); + testNodeProcessingSchema(new InsertSortDesc<String>(), "String"); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testNodeProcessingSchema(InsertSortDesc oper, String debug) + { + //FirstN<String,Float> aoper = new FirstN<String,Float>(); + CollectorTestSink sortSink = new CollectorTestSink(); + CollectorTestSink hashSink = new CollectorTestSink(); + oper.sort.setSink(sortSink); + oper.sorthash.setSink(hashSink); + + ArrayList input = new ArrayList(); + + oper.beginWindow(0); + + input.add(2); + oper.datalist.process(input); + oper.data.process(20); + + input.clear(); + input.add(1000); + input.add(5); + input.add(20); + input.add(33); + input.add(33); + input.add(34); + oper.datalist.process(input); + + input.clear(); + input.add(34); + input.add(1001); + input.add(6); + input.add(1); + input.add(33); + input.add(9); + oper.datalist.process(input); + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 1, sortSink.collectedTuples.size()); + Assert.assertEquals("number emitted tuples", 1, hashSink.collectedTuples.size()); + HashMap map = (HashMap)hashSink.collectedTuples.get(0); + input = (ArrayList)sortSink.collectedTuples.get(0); + for (Object o : input) { + log.debug(String.format("%s : %s", o.toString(), map.get(o).toString())); + } + log.debug(String.format("Tested %s type with %d tuples and %d uniques\n", debug, input.size(), map.size())); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArrayTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArrayTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArrayTest.java new file mode 100644 index 0000000..d37789d --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArrayTest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Sink; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +/** + * @deprecated + * Functional tests for {@link InvertIndex} <p> + * + */ +@Deprecated +public class InvertIndexArrayTest +{ + private static Logger log = LoggerFactory.getLogger(InvertIndexArrayTest.class); + + /** + * Test oper logic emits correct results + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testNodeProcessing() throws Exception + { + InvertIndexArray<String,String> oper = new InvertIndexArray<String,String>(); + CollectorTestSink indexSink = new CollectorTestSink(); + + Sink inSink = oper.data.getSink(); + oper.index.setSink(indexSink); + + oper.beginWindow(0); + + HashMap<String, ArrayList> input = new HashMap<String, ArrayList>(); + ArrayList<String> alist = new ArrayList<String>(); + alist.add("str"); + alist.add("str1"); + input.put("a", alist); + input.put("b", alist); + inSink.put(input); + + alist = new ArrayList<String>(); + input = new HashMap<String, ArrayList>(); + alist.add("blah"); + alist.add("str1"); + input.put("c", alist); + inSink.put(input); + + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 3, indexSink.collectedTuples.size()); + for (Object o: indexSink.collectedTuples) { + log.debug(o.toString()); + HashMap<String, ArrayList<String>> output = (HashMap<String, ArrayList<String>>)o; + for (Map.Entry<String, ArrayList<String>> e: output.entrySet()) { + String key = e.getKey(); + alist = e.getValue(); + if (key.equals("str1")) { + Assert.assertEquals("Index for \"str1\" contains \"a\"", true, alist.contains("a")); + Assert.assertEquals("Index for \"str1\" contains \"b\"", true, alist.contains("b")); + Assert.assertEquals("Index for \"str1\" contains \"c\"", true, alist.contains("c")); + + } else if (key.equals("str")) { + Assert.assertEquals("Index for \"str1\" contains \"a\"", true, alist.contains("a")); + Assert.assertEquals("Index for \"str1\" contains \"b\"", true, alist.contains("b")); + Assert.assertEquals("Index for \"str1\" contains \"c\"", false, alist.contains("c")); + + } else if (key.equals("blah")) { + Assert.assertEquals("Index for \"str1\" contains \"a\"", false, alist.contains("a")); + Assert.assertEquals("Index for \"str1\" contains \"b\"", false, alist.contains("b")); + Assert.assertEquals("Index for \"str1\" contains \"c\"", true, alist.contains("c")); + } + } + } + } +}
