Updated algo & working on math operators
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8f00cefa Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8f00cefa Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8f00cefa Branch: refs/heads/master Commit: 8f00cefa2a14756a65e0baad48db0d52e2fe66a3 Parents: 2e47b4c Author: Lakshmi Prasanna Velineni <[email protected]> Authored: Wed Aug 24 20:54:53 2016 -0700 Committer: Lakshmi Prasanna Velineni <[email protected]> Committed: Thu Sep 1 09:12:43 2016 -0700 ---------------------------------------------------------------------- .../datatorrent/apps/logstream/Application.java | 5 +- .../contrib/sqlite/SqliteStreamOperator.java | 4 +- .../misc/algo/AbstractStreamPatternMatcher.java | 174 +++++++++++ .../contrib/misc/algo/AllAfterMatchMap.java | 120 ++++++++ .../malhar/contrib/misc/algo/DistinctMap.java | 113 +++++++ .../malhar/contrib/misc/algo/FilterKeyVals.java | 163 ++++++++++ .../contrib/misc/algo/FilterKeysHashMap.java | 184 +++++++++++ .../malhar/contrib/misc/algo/FilterKeysMap.java | 196 ++++++++++++ .../malhar/contrib/misc/algo/FirstMatchMap.java | 117 +++++++ .../apex/malhar/contrib/misc/algo/FirstN.java | 113 +++++++ .../contrib/misc/algo/FirstTillMatch.java | 116 +++++++ .../contrib/misc/algo/InsertSortDesc.java | 136 +++++++++ .../malhar/contrib/misc/algo/InvertIndex.java | 146 +++++++++ .../contrib/misc/algo/InvertIndexArray.java | 130 ++++++++ .../malhar/contrib/misc/algo/LastMatchMap.java | 112 +++++++ .../contrib/misc/algo/LeastFrequentKeyMap.java | 149 +++++++++ .../misc/algo/LeastFrequentKeyValueMap.java | 107 +++++++ .../contrib/misc/algo/MostFrequentKeyMap.java | 142 +++++++++ .../misc/algo/MostFrequentKeyValueMap.java | 110 +++++++ .../apex/malhar/contrib/misc/algo/Sampler.java | 121 ++++++++ .../apex/malhar/contrib/misc/math/Change.java | 119 ++++++++ .../malhar/contrib/misc/math/ChangeAlert.java | 120 ++++++++ .../contrib/misc/math/ChangeAlertKeyVal.java | 131 ++++++++ .../contrib/misc/math/ChangeAlertMap.java | 125 ++++++++ .../malhar/contrib/misc/math/ChangeKeyVal.java | 125 ++++++++ .../contrib/misc/math/CompareExceptMap.java | 131 ++++++++ .../malhar/contrib/misc/math/CompareMap.java | 88 ++++++ .../malhar/contrib/misc/math/CountKeyVal.java | 116 +++++++ .../malhar/contrib/misc/math/ExceptMap.java | 104 +++++++ .../apex/malhar/contrib/misc/math/Quotient.java | 111 +++++++ .../malhar/contrib/misc/math/QuotientMap.java | 239 +++++++++++++++ .../malhar/contrib/misc/math/SumCountMap.java | 305 +++++++++++++++++++ .../streamquery/AbstractSqlStreamOperator.java | 192 ++++++++++++ .../misc/streamquery/DeleteOperator.java | 88 ++++++ .../streamquery/DerbySqlStreamOperator.java | 200 ++++++++++++ .../misc/streamquery/GroupByHavingOperator.java | 230 ++++++++++++++ .../misc/streamquery/InnerJoinOperator.java | 212 +++++++++++++ .../misc/streamquery/OrderByOperator.java | 181 +++++++++++ .../contrib/misc/streamquery/OrderByRule.java | 99 ++++++ .../misc/streamquery/OuterJoinOperator.java | 123 ++++++++ .../streamquery/SelectFunctionOperator.java | 129 ++++++++ .../misc/streamquery/SelectOperator.java | 113 +++++++ .../misc/streamquery/SelectTopOperator.java | 131 ++++++++ .../misc/streamquery/UpdateOperator.java | 111 +++++++ .../streamquery/condition/BetweenCondition.java | 107 +++++++ .../condition/CompoundCondition.java | 132 ++++++++ .../condition/EqualValueCondition.java | 99 ++++++ .../condition/HavingCompareValue.java | 79 +++++ .../streamquery/condition/HavingCondition.java | 58 ++++ .../misc/streamquery/condition/InCondition.java | 94 ++++++ .../streamquery/condition/LikeCondition.java | 105 +++++++ .../streamquery/function/AverageFunction.java | 82 +++++ .../streamquery/function/CountFunction.java | 87 ++++++ .../streamquery/function/FirstLastFunction.java | 113 +++++++ .../streamquery/function/FunctionIndex.java | 95 ++++++ .../streamquery/function/MaxMinFunction.java | 105 +++++++ .../misc/streamquery/function/SumFunction.java | 64 ++++ .../streamquery/index/BinaryExpression.java | 75 +++++ .../misc/streamquery/index/MidIndex.java | 82 +++++ .../streamquery/index/NegateExpression.java | 61 ++++ .../streamquery/index/RoundDoubleIndex.java | 64 ++++ .../misc/streamquery/index/StringCaseIndex.java | 66 ++++ .../misc/streamquery/index/StringLenIndex.java | 60 ++++ .../misc/streamquery/index/SumExpression.java | 65 ++++ .../misc/streamquery/index/UnaryExpression.java | 78 +++++ .../contrib/misc/streamquery/package-info.java | 23 ++ .../algo/AbstractStreamPatternMatcherTest.java | 173 +++++++++++ .../contrib/misc/algo/AllAfterMatchMapTest.java | 93 ++++++ .../contrib/misc/algo/DistinctMapTest.java | 111 +++++++ .../contrib/misc/algo/FilterKeyValsTest.java | 127 ++++++++ .../misc/algo/FilterKeysHashMapTest.java | 151 +++++++++ .../contrib/misc/algo/FilterKeysMapTest.java | 121 ++++++++ .../contrib/misc/algo/FirstMatchMapTest.java | 103 +++++++ .../malhar/contrib/misc/algo/FirstNTest.java | 123 ++++++++ .../contrib/misc/algo/FirstTillMatchTest.java | 110 +++++++ .../contrib/misc/algo/InsertSortDescTest.java | 97 ++++++ .../contrib/misc/algo/InvertIndexArrayTest.java | 101 ++++++ .../contrib/misc/algo/InvertIndexTest.java | 104 +++++++ .../contrib/misc/algo/LastMatchMapTest.java | 105 +++++++ .../misc/algo/LeastFrequentKeyMapTest.java | 117 +++++++ .../misc/algo/LeastFrequentKeyValueMapTest.java | 109 +++++++ .../malhar/contrib/misc/algo/MatchMapTest.java | 83 +++++ .../misc/algo/MostFrequentKeyMapTest.java | 118 +++++++ .../misc/algo/MostFrequentKeyValueMapTest.java | 109 +++++++ .../malhar/contrib/misc/algo/SamplerTest.java | 64 ++++ .../misc/math/ChangeAlertKeyValTest.java | 108 +++++++ .../contrib/misc/math/ChangeAlertMapTest.java | 116 +++++++ .../contrib/misc/math/ChangeAlertTest.java | 83 +++++ .../contrib/misc/math/ChangeKeyValTest.java | 112 +++++++ .../malhar/contrib/misc/math/ChangeTest.java | 85 ++++++ .../contrib/misc/math/CompareExceptMapTest.java | 98 ++++++ .../contrib/misc/math/CompareMapTest.java | 83 +++++ .../contrib/misc/math/CountKeyValTest.java | 83 +++++ .../malhar/contrib/misc/math/ExceptMapTest.java | 85 ++++++ .../contrib/misc/math/QuotientMapTest.java | 94 ++++++ .../malhar/contrib/misc/math/QuotientTest.java | 104 +++++++ .../contrib/misc/math/SumCountMapTest.java | 156 ++++++++++ .../misc/streamquery/DeleteOperatorTest.java | 80 +++++ .../streamquery/FullOuterJoinOperatorTest.java | 93 ++++++ .../misc/streamquery/GroupByOperatorTest.java | 97 ++++++ .../misc/streamquery/HavingOperatorTest.java | 99 ++++++ .../misc/streamquery/InnerJoinOperatorTest.java | 92 ++++++ .../streamquery/LeftOuterJoinOperatorTest.java | 93 ++++++ .../misc/streamquery/OrderByOperatorTest.java | 95 ++++++ .../streamquery/RightOuterJoinOperatorTest.java | 95 ++++++ .../misc/streamquery/SelectOperatorTest.java | 84 +++++ .../misc/streamquery/SelectTopOperatorTest.java | 66 ++++ .../misc/streamquery/UpdateOperatorTest.java | 78 +++++ .../advanced/BetweenConditionTest.java | 90 ++++++ .../advanced/CompoundConditionTest.java | 95 ++++++ .../streamquery/advanced/InConditionTest.java | 93 ++++++ .../streamquery/advanced/LikeConditionTest.java | 84 +++++ .../streamquery/advanced/NegateIndexTest.java | 78 +++++ .../streamquery/advanced/SelectAverageTest.java | 78 +++++ .../streamquery/advanced/SelectCountTest.java | 79 +++++ .../advanced/SelectFirstLastTest.java | 79 +++++ .../streamquery/advanced/SelectMaxMinTest.java | 79 +++++ .../misc/streamquery/advanced/SumIndexTest.java | 79 +++++ demos/yahoofinance/pom.xml | 11 + .../yahoofinance/ApplicationWithDerbySQL.java | 4 +- .../lib/algo/AbstractStreamPatternMatcher.java | 173 ----------- .../datatorrent/lib/algo/AllAfterMatchMap.java | 118 ------- .../lib/algo/CompareExceptCountMap.java | 3 +- .../java/com/datatorrent/lib/algo/Distinct.java | 5 +- .../com/datatorrent/lib/algo/DistinctMap.java | 111 ------- .../com/datatorrent/lib/algo/FilterKeyVals.java | 161 ---------- .../datatorrent/lib/algo/FilterKeysHashMap.java | 182 ----------- .../com/datatorrent/lib/algo/FilterKeysMap.java | 194 ------------ .../com/datatorrent/lib/algo/FilterValues.java | 17 +- .../com/datatorrent/lib/algo/FirstMatchMap.java | 116 ------- .../java/com/datatorrent/lib/algo/FirstN.java | 112 ------- .../datatorrent/lib/algo/FirstTillMatch.java | 115 ------- .../datatorrent/lib/algo/InsertSortDesc.java | 135 -------- .../com/datatorrent/lib/algo/InvertIndex.java | 145 --------- .../datatorrent/lib/algo/InvertIndexArray.java | 129 -------- .../com/datatorrent/lib/algo/LastMatchMap.java | 111 ------- .../lib/algo/LeastFrequentKeyMap.java | 149 --------- .../lib/algo/LeastFrequentKeyValueMap.java | 106 ------- .../com/datatorrent/lib/algo/MatchAllMap.java | 3 +- .../com/datatorrent/lib/algo/MatchAnyMap.java | 3 +- .../java/com/datatorrent/lib/algo/MatchMap.java | 2 + .../com/datatorrent/lib/algo/MergeSort.java | 2 + .../datatorrent/lib/algo/MergeSortNumber.java | 2 + .../lib/algo/MostFrequentKeyMap.java | 141 --------- .../lib/algo/MostFrequentKeyValueMap.java | 110 ------- .../java/com/datatorrent/lib/algo/Sampler.java | 119 -------- .../com/datatorrent/lib/algo/TopNUnique.java | 3 +- .../datatorrent/lib/join/AntiJoinOperator.java | 2 +- .../java/com/datatorrent/lib/math/Average.java | 29 +- .../java/com/datatorrent/lib/math/Change.java | 117 ------- .../com/datatorrent/lib/math/ChangeAlert.java | 118 ------- .../datatorrent/lib/math/ChangeAlertKeyVal.java | 129 -------- .../datatorrent/lib/math/ChangeAlertMap.java | 123 -------- .../com/datatorrent/lib/math/ChangeKeyVal.java | 123 -------- .../datatorrent/lib/math/CompareExceptMap.java | 129 -------- .../com/datatorrent/lib/math/CompareMap.java | 86 ------ .../com/datatorrent/lib/math/CountKeyVal.java | 114 ------- .../com/datatorrent/lib/math/ExceptMap.java | 102 ------- .../java/com/datatorrent/lib/math/Quotient.java | 109 ------- .../com/datatorrent/lib/math/QuotientMap.java | 237 -------------- .../com/datatorrent/lib/math/SumCountMap.java | 303 ------------------ .../streamquery/AbstractSqlStreamOperator.java | 190 ------------ .../lib/streamquery/DeleteOperator.java | 86 ------ .../lib/streamquery/DerbySqlStreamOperator.java | 197 ------------ .../lib/streamquery/GroupByHavingOperator.java | 260 ---------------- .../lib/streamquery/InnerJoinOperator.java | 210 ------------- .../lib/streamquery/OrderByOperator.java | 179 ----------- .../lib/streamquery/OrderByRule.java | 97 ------ .../lib/streamquery/OuterJoinOperator.java | 121 -------- .../lib/streamquery/SelectFunctionOperator.java | 126 -------- .../lib/streamquery/SelectOperator.java | 111 ------- .../lib/streamquery/SelectTopOperator.java | 129 -------- .../lib/streamquery/UpdateOperator.java | 109 ------- .../streamquery/condition/BetweenCondition.java | 103 ------- .../condition/CompoundCondition.java | 128 -------- .../condition/EqualValueCondition.java | 96 ------ .../condition/HavingCompareValue.java | 77 ----- .../streamquery/condition/HavingCondition.java | 56 ---- .../lib/streamquery/condition/InCondition.java | 90 ------ .../condition/JoinColumnEqualCondition.java | 1 - .../streamquery/condition/LikeCondition.java | 102 ------- .../streamquery/function/AverageFunction.java | 80 ----- .../lib/streamquery/function/CountFunction.java | 85 ------ .../streamquery/function/FirstLastFunction.java | 111 ------- .../lib/streamquery/function/FunctionIndex.java | 93 ------ .../streamquery/function/MaxMinFunction.java | 103 ------- .../lib/streamquery/function/SumFunction.java | 62 ---- .../lib/streamquery/index/BinaryExpression.java | 72 ----- .../lib/streamquery/index/MidIndex.java | 78 ----- .../lib/streamquery/index/NegateExpression.java | 59 ---- .../lib/streamquery/index/RoundDoubleIndex.java | 60 ---- .../lib/streamquery/index/StringCaseIndex.java | 62 ---- .../lib/streamquery/index/StringLenIndex.java | 56 ---- .../lib/streamquery/index/SumExpression.java | 63 ---- .../lib/streamquery/index/UnaryExpression.java | 75 ----- .../lib/streamquery/package-info.java | 23 -- .../algo/AbstractStreamPatternMatcherTest.java | 173 ----------- .../lib/algo/AllAfterMatchMapTest.java | 92 ------ .../lib/algo/CompareExceptCountMapTest.java | 6 +- .../datatorrent/lib/algo/DistinctMapTest.java | 110 ------- .../datatorrent/lib/algo/FilterKeyValsTest.java | 126 -------- .../lib/algo/FilterKeysHashMapTest.java | 150 --------- .../datatorrent/lib/algo/FilterKeysMapTest.java | 120 -------- .../datatorrent/lib/algo/FirstMatchMapTest.java | 102 ------- .../com/datatorrent/lib/algo/FirstNTest.java | 122 -------- .../lib/algo/FirstTillMatchTest.java | 109 ------- .../lib/algo/InsertSortDescTest.java | 96 ------ .../lib/algo/InvertIndexArrayTest.java | 100 ------ .../datatorrent/lib/algo/InvertIndexTest.java | 103 ------- .../datatorrent/lib/algo/LastMatchMapTest.java | 104 ------- .../lib/algo/LeastFrequentKeyMapTest.java | 116 ------- .../lib/algo/LeastFrequentKeyValueMapTest.java | 108 ------- .../datatorrent/lib/algo/MatchAllMapTest.java | 5 +- .../datatorrent/lib/algo/MatchAnyMapTest.java | 6 +- .../com/datatorrent/lib/algo/MatchMapTest.java | 81 ----- .../lib/algo/MergeSortNumberTest.java | 4 +- .../lib/algo/MostFrequentKeyMapTest.java | 117 ------- .../lib/algo/MostFrequentKeyValueMapTest.java | 108 ------- .../com/datatorrent/lib/algo/SamplerTest.java | 63 ---- .../datatorrent/lib/algo/TopNUniqueTest.java | 5 +- .../lib/join/AntiJoinOperatorTest.java | 1 + .../com/datatorrent/lib/math/AverageTest.java | 4 +- .../lib/math/ChangeAlertKeyValTest.java | 107 ------- .../lib/math/ChangeAlertMapTest.java | 115 ------- .../datatorrent/lib/math/ChangeAlertTest.java | 82 ----- .../datatorrent/lib/math/ChangeKeyValTest.java | 111 ------- .../com/datatorrent/lib/math/ChangeTest.java | 84 ----- .../lib/math/CompareExceptMapTest.java | 97 ------ .../datatorrent/lib/math/CompareMapTest.java | 82 ----- .../datatorrent/lib/math/CountKeyValTest.java | 82 ----- .../com/datatorrent/lib/math/DivisionTest.java | 1 + .../com/datatorrent/lib/math/ExceptMapTest.java | 83 ----- .../lib/math/MultiplyByConstantTest.java | 2 + .../datatorrent/lib/math/QuotientMapTest.java | 92 ------ .../com/datatorrent/lib/math/QuotientTest.java | 102 ------- .../com/datatorrent/lib/math/SigmaTest.java | 3 +- .../datatorrent/lib/math/SumCountMapTest.java | 154 ---------- .../lib/streamquery/DeleteOperatorTest.java | 77 ----- .../streamquery/FullOuterJoinOperatorTest.java | 93 ------ .../lib/streamquery/GroupByOperatorTest.java | 94 ------ .../lib/streamquery/HavingOperatorTest.java | 96 ------ .../lib/streamquery/InnerJoinOperatorTest.java | 91 ------ .../streamquery/LeftOuterJoinOperatorTest.java | 92 ------ .../lib/streamquery/OrderByOperatorTest.java | 93 ------ .../streamquery/RightOuterJoinOperatorTest.java | 94 ------ .../lib/streamquery/SelectOperatorTest.java | 81 ----- .../lib/streamquery/SelectTopOperatorTest.java | 65 ---- .../lib/streamquery/UpdateOperatorTest.java | 76 ----- .../advanced/BetweenConditionTest.java | 87 ------ .../advanced/CompoundConditionTest.java | 92 ------ .../streamquery/advanced/InConditionTest.java | 90 ------ .../streamquery/advanced/LikeConditionTest.java | 81 ----- .../streamquery/advanced/NegateIndexTest.java | 75 ----- .../streamquery/advanced/SelectAverageTest.java | 75 ----- .../streamquery/advanced/SelectCountTest.java | 76 ----- .../advanced/SelectFirstLastTest.java | 76 ----- .../streamquery/advanced/SelectMaxMinTest.java | 76 ----- .../lib/streamquery/advanced/SumIndexTest.java | 76 ----- samples/pom.xml | 4 +- .../lib/algo/AllAfterMatchMapSample.java | 4 +- .../samples/lib/math/ChangeSample.java | 4 +- .../samples/lib/math/CompreMapSample.java | 4 +- .../samples/lib/math/CountKeyValSample.java | 4 +- 263 files changed, 12997 insertions(+), 12767 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java ---------------------------------------------------------------------- diff --git a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java index 0cd3f79..98dfebd 100644 --- a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java +++ b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/Application.java @@ -25,6 +25,9 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.apex.malhar.contrib.misc.streamquery.SelectOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -36,8 +39,6 @@ import com.datatorrent.lib.logs.MultiWindowDimensionAggregation; import com.datatorrent.lib.logs.MultiWindowDimensionAggregation.AggregateOperation; import com.datatorrent.lib.stream.Counter; import com.datatorrent.lib.stream.JsonByteArrayOperator; -import com.datatorrent.lib.streamquery.SelectOperator; -import com.datatorrent.lib.streamquery.condition.EqualValueCondition; import com.datatorrent.lib.streamquery.index.ColumnIndex; import com.datatorrent.lib.util.AbstractDimensionTimeBucketOperator; import com.datatorrent.lib.util.DimensionTimeBucketSumOperator; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/com/datatorrent/contrib/sqlite/SqliteStreamOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/sqlite/SqliteStreamOperator.java b/contrib/src/main/java/com/datatorrent/contrib/sqlite/SqliteStreamOperator.java index ee06272..2cb33e4 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/sqlite/SqliteStreamOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/sqlite/SqliteStreamOperator.java @@ -21,8 +21,8 @@ package com.datatorrent.contrib.sqlite; import com.almworks.sqlite4java.SQLiteConnection; import com.almworks.sqlite4java.SQLiteException; import com.almworks.sqlite4java.SQLiteStatement; -import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator; -import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo; +import org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo; import com.datatorrent.api.Context.OperatorContext; import java.io.File; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcher.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcher.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcher.java new file mode 100644 index 0000000..38b176c --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcher.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.Iterator; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang3.mutable.MutableInt; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; +@Deprecated +/** + * <p> + * This operator searches for a given pattern in the input stream.<br> + * For e.g. If the pattern is defined as âaaâ and your input events arrive in following manner âaâ, âaâ, âaâ, then this operator + * will emit 2 matches for the given pattern. One matching event 1 and 2 and other matching 2 and 3. + * </p> + * + * <br> + * <b> StateFull : Yes, </b> Pattern is found over application window(s). <br> + * <b> Partitionable : No, </b> will yield wrong result. <br> + * + * <br> + * <b>Ports</b>:<br> + * <b>inputPort</b>: the port to receive input<br> + * + * <br> + * <b>Properties</b>:<br> + * <b>pattern</b>: The pattern that needs to be searched<br> + * + * @param <T> event type + * + * @since 2.0.0 + * @deprecated + */ + +@OperatorAnnotation(partitionable = false) +public abstract class AbstractStreamPatternMatcher<T> extends BaseOperator +{ + /** + * The pattern to be searched in the input stream of events + */ + @NotNull + private Pattern<T> pattern; + + // this stores the index of the partial matches found so far + private List<MutableInt> partialMatches = Lists.newLinkedList(); + private transient MutableInt patternLength; + + /** + * Set the pattern that needs to be searched in the input stream of events + * + * @param pattern The pattern to be searched + */ + public void setPattern(Pattern<T> pattern) + { + this.pattern = pattern; + partialMatches.clear(); + patternLength = new MutableInt(pattern.getStates().length - 1); + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + patternLength = new MutableInt(pattern.getStates().length - 1); + } + + /** + * Get the pattern that is searched in the input stream of events + * + * @return Returns the pattern searched + */ + public Pattern<T> getPattern() + { + return pattern; + } + + public transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() + { + @Override + public void process(T t) + { + if (pattern.checkState(t, 0)) { + partialMatches.add(new MutableInt(-1)); + } + if (partialMatches.size() > 0) { + MutableInt tempInt; + Iterator<MutableInt> itr = partialMatches.iterator(); + while (itr.hasNext()) { + tempInt = itr.next(); + tempInt.increment(); + if (!pattern.checkState(t, tempInt.intValue())) { + itr.remove(); + } else if (tempInt.equals(patternLength)) { + itr.remove(); + processPatternFound(); + } + } + } + } + }; + + /** + * This function determines how to process the pattern found + */ + public abstract void processPatternFound(); + + public static class Pattern<T> + { + /** + * The states of the pattern + */ + @NotNull + private final T[] states; + + //for kryo + private Pattern() + { + states = null; + } + + public Pattern(@NotNull T[] states) + { + this.states = states; + } + + /** + * Checks if the input state matches the state at index "index" of the pattern + * + * @param t The input state + * @param index The index to match in the pattern + * @return True if the state exists at index "index" else false + */ + public boolean checkState(T t, int index) + { + return states[index].equals(t); + } + + /** + * Get the states of the pattern + * + * @return The states of the pattern + */ + public T[] getStates() + { + return states; + } + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMap.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMap.java new file mode 100644 index 0000000..44118d5 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMap.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; + +import com.datatorrent.lib.util.BaseMatchOperator; + +/** + * This operator takes Maps, whose values are numbers, as input tuples. + * It then performs a numeric comparison on the values corresponding to one of the keys in the input tuple maps. + * All tuples processed by the operator before the first successful comparison are not output by the operator, + * all tuples processed by the operator after and including a successful comparison are output by the operator. + * + * <p> + * A compare metric is done on input tuple based on the property "key", + * "value", and "cmp" type. All tuples are emitted (inclusive) once a match is made. + * The comparison is done by getting double value from the Number. + * This module is a pass through<br> + * <br> + * <b> StateFull : Yes, </b> Count is aggregated over application window(s). <br> + * <b> Partitions : No, </b> will yield wrong result. <br> + * <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: expects Map<K,V extends Number><br> + * <b>allafter</b>: emits Map<K,V extends Number> if compare function + * returns true<br> + * <br> + * <b>Properties</b>:<br> + * <b>key</b>: The key on which compare is done<br> + * <b>value</b>: The value to compare with<br> + * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", + * "neq", "gt", "gte". Default is "eq"<br> + * <br> + * <b>Specific compile time checks</b>:<br> + * Key must be non empty<br> + * Value must be able to convert to a "double"<br> + * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", + * "gte"<br> + * <b>Specific run time checks</b>: None<br> + * <br> + * </p> + * + * @displayName Emit All After Match (Number) + * @category Rules and Alerts + * @tags filter, compare, numeric, key value + * + * @since 0.3.2 + * @deprecated + */ +@Deprecated +@OperatorAnnotation(partitionable = false) +public class AllAfterMatchMap<K, V extends Number> extends + BaseMatchOperator<K, V> +{ + /** + * The input port on which tuples are received. + */ + public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>() + { + /** + * Process HashMap<K,V> and emit all tuples at and after match + */ + @Override + public void process(Map<K, V> tuple) + { + if (doemit) { + allafter.emit(cloneTuple(tuple)); + return; + } + V v = tuple.get(getKey()); + if (v == null) { // error tuple + return; + } + if (compareValue(v.doubleValue())) { + doemit = true; + allafter.emit(cloneTuple(tuple)); + } + } + }; + + /** + * The output port on which all tuples after a match are emitted. + */ + public final transient DefaultOutputPort<HashMap<K, V>> allafter = new DefaultOutputPort<HashMap<K, V>>(); + boolean doemit = false; + + /** + * Resets the matched variable + * + * @param windowId + */ + @Override + public void beginWindow(long windowId) + { + doemit = false; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMap.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMap.java new file mode 100644 index 0000000..426c2e5 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMap.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; + +import com.datatorrent.lib.util.BaseKeyValueOperator; +import com.datatorrent.lib.util.UnifierHashMap; + +/** + * This operator computes and emits distinct key,val pairs (i.e drops duplicates). + * <p> + * Computes and emits distinct key,val pairs (i.e drops duplicates) + * </p> + * <p> + * This is a pass through operator<br> + * <br> + * This module is same as a "FirstOf" metric on any key,val pair. At end of window all data is flushed.<br> + * <br> + * <b>StateFull : Yes, </b> tuple are compare across application window(s). <br> + * <b>Partitions : Yes, </b> distinct output is unified by unifier hash map operator. <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: Input data port expects Map<K,V><br> + * <b>distinct</b>: Output data port, emits HashMap<K,V>(1)<br> + * <br> + * </p> + * + * @displayName Distinct Key Value Merge + * @category Stream Manipulators + * @tags filter, unique, key value + * + * @since 0.3.2 + * @deprecated + */ + +@Deprecated +@OperatorAnnotation(partitionable = true) +public class DistinctMap<K, V> extends BaseKeyValueOperator<K, V> +{ + /** + * The input port on which key value pairs are received. + */ + public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>() + { + /** + * Process HashMap<K,V> tuple on input port data, and emits if match not found. Updates the cache + * with new key,val pair + */ + @Override + public void process(Map<K, V> tuple) + { + for (Map.Entry<K, V> e: tuple.entrySet()) { + HashMap<V, Object> vals = mapkeyval.get(e.getKey()); + if ((vals == null) || !vals.containsKey(e.getValue())) { + HashMap<K, V> otuple = new HashMap<K, V>(1); + otuple.put(cloneKey(e.getKey()), cloneValue(e.getValue())); + distinct.emit(otuple); + if (vals == null) { + vals = new HashMap<V, Object>(); + mapkeyval.put(cloneKey(e.getKey()), vals); + } + vals.put(cloneValue(e.getValue()), null); + } + } + } + }; + + /** + * The output port on which distinct key value pairs are emitted. + */ + public final transient DefaultOutputPort<HashMap<K, V>> distinct = new DefaultOutputPort<HashMap<K, V>>() + { + @Override + public Unifier<HashMap<K, V>> getUnifier() + { + return new UnifierHashMap<K, V>(); + } + }; + + + protected HashMap<K, HashMap<V, Object>> mapkeyval = new HashMap<K, HashMap<V, Object>>(); + + /** + * Clears the cache/hash + */ + @Override + public void endWindow() + { + mapkeyval.clear(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyVals.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyVals.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyVals.java new file mode 100644 index 0000000..9925d69 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyVals.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.Stateless; + +import com.datatorrent.lib.util.BaseKeyOperator; + +/** + * This operator filters the incoming stream of tuples using a set of specified key value pairs. + * Tuples that match the filter are emitted by the operator. + * <p> + * Filters the incoming stream based of specified key,val pairs, and emits those that match the filter. If + * property "inverse" is set to "true", then all key,val pairs except those specified by in keyvals parameter are emitted + * </p> + * <p> + * Operator assumes that the key, val pairs are immutable objects. If this operator has to be used for mutable objects, + * override "cloneKey()" to make copy of K, and "cloneValue()" to make copy of V.<br> + * This is a pass through node<br> + * <br> + * <b>StateFull : No, </b> tuple are processed in current window. <br> + * <b>Partitions : Yes, </b> no dependency among input tuples. <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: expects HashMap<K,V><br> + * <b>filter</b>: emits HashMap<K,V>(1)<br> + * <br> + * <b>Properties</b>:<br> + * <b>keyvals</b>: The keyvals is key,val pairs to pass through, rest are filtered/dropped.<br> + * <br> + * </p> + * + * @displayName Filter Keyval Pairs + * @category Rules and Alerts + * @tags filter, key value + * + * @since 0.3.2 + * @deprecated + */ +@Deprecated +@Stateless +@OperatorAnnotation(partitionable = true) +public class FilterKeyVals<K,V> extends BaseKeyOperator<K> +{ + /** + * The input port on which key value pairs are received. + */ + public final transient DefaultInputPort<HashMap<K, V>> data = new DefaultInputPort<HashMap<K, V>>() + { + /** + * Processes incoming tuples one key,val at a time. Emits if at least one key makes the cut + * By setting inverse as true, match is changed to un-matched + */ + @Override + public void process(HashMap<K, V> tuple) + { + for (Map.Entry<K, V> e: tuple.entrySet()) { + entry.clear(); + entry.put(e.getKey(),e.getValue()); + boolean contains = keyvals.containsKey(entry); + if ((contains && !inverse) || (!contains && inverse)) { + HashMap<K, V> dtuple = new HashMap<K,V>(1); + dtuple.put(cloneKey(e.getKey()), cloneValue(e.getValue())); + filter.emit(dtuple); + } + } + } + }; + + /** + * The output port on which filtered key value pairs are emitted. + */ + public final transient DefaultOutputPort<HashMap<K, V>> filter = new DefaultOutputPort<HashMap<K, V>>(); + + @NotNull() + HashMap<HashMap<K,V>,Object> keyvals = new HashMap<HashMap<K,V>,Object>(); + boolean inverse = false; + private transient HashMap<K,V> entry = new HashMap<K,V>(1); + + /** + * Gets the inverse property. + * @return inverse + */ + public boolean getInverse() + { + return inverse; + } + + /** + * If true then only matches are emitted. If false then only non matches are emitted. + * @param val + */ + public void setInverse(boolean val) + { + inverse = val; + } + + /** + * True means match; False means unmatched + * @return keyvals hash + */ + @NotNull() + public HashMap<HashMap<K,V>,Object> getKeyVals() + { + return keyvals; + } + + /** + * Adds a key to the filter list + * @param map with key,val pairs to set as filters + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void setKeyVals(HashMap<K,V> map) + { + for (Map.Entry<K, V> e: map.entrySet()) { + HashMap kvpair = new HashMap<K,V>(1); + kvpair.put(cloneKey(e.getKey()), cloneValue(e.getValue())); + keyvals.put(kvpair, null); + } + } + + /* + * Clears the filter list + */ + public void clearKeys() + { + keyvals.clear(); + } + + /** + * Clones V object. By default assumes immutable object (i.e. a copy is not made). If object is mutable, override this method + * @param v value to be cloned + * @return returns v as is (assumes immutable object) + */ + public V cloneValue(V v) + { + return v; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMap.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMap.java new file mode 100644 index 0000000..cfee74c --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMap.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.Stateless; + +import com.datatorrent.lib.util.BaseKeyOperator; + + +/** + * This operator filters the incoming stream of key value pairs based on the keys specified by property "keys". + * <p> + * Filters the incoming stream based of keys specified by property "keys". If + * property "inverse" is set to "true", then all keys except those specified by "keys" are emitted + * </p> + * <p> + * Operator assumes that the key, val pairs are immutable objects. If this operator has to be used for mutable objects, + * override "cloneKey()" to make copy of K, and "cloneValue()" to make copy of V.<br> + * This is a pass through node.<br> + * <br> + * <b>StateFull : No, </b> tuple are processed in current window. <br> + * <b>Partitions : Yes, </b> no dependency among input tuples. <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: Expects Map<K, HashMap<K,V>>. Filters are applied only on keys of second hash map.<br> + * <b>filter</b>: Emits HashMap<K, HashMap<K,V>>.<br> + * <br> + * <b>Properties</b>:<br> + * <b>keys</b>: The keys to pass through, rest are filtered/dropped. A comma separated list of keys.<br> + * <br> + * </p> + * + * @displayName Filter Keyval Pairs By Key HashMap + * @category Stream Manipulators + * @tags filter, key value + * + * @since 0.3.2 + * @deprecated + */ +@Deprecated +@Stateless +@OperatorAnnotation(partitionable = true) +public class FilterKeysHashMap<K, V> extends BaseKeyOperator<K> +{ + /** + * Filter keys map. + */ + @NotNull() + HashMap<K, V> keys = new HashMap<K, V>(); + + /** + * Emits key not in filter map. + */ + boolean inverse = false; + + /** + * The input port on which key value pairs are received. + */ + public final transient DefaultInputPort<Map<K, HashMap<K, V>>> data = new DefaultInputPort<Map<K, HashMap<K, V>>>() + { + /** + * Processes incoming tuples one key,val at a time. Emits if at least one key makes the cut. + * By setting inverse as true, match is changed to un-matched. + */ + @Override + public void process(Map<K, HashMap<K, V>> tuple) + { + HashMap<K, HashMap<K, V>> dtuple = null; + for (Map.Entry<K, HashMap<K, V>> e: tuple.entrySet()) { + HashMap<K, V> dtuple2 = null; + for (Map.Entry<K, V> e2: e.getValue().entrySet()) { + boolean contains = keys.containsKey(e2.getKey()); + if ((contains && !inverse) || (!contains && inverse)) { + if (dtuple2 == null) { + dtuple2 = new HashMap<K, V>(4); // usually the filter keys are very few, so 4 is just fine + } + dtuple2.put(cloneKey(e2.getKey()), cloneValue(e2.getValue())); + } + } + if (dtuple == null && dtuple2 != null) { + dtuple = new HashMap<K, HashMap<K, V>>(); + } + if (dtuple != null && dtuple2 != null) { + dtuple.put(cloneKey(e.getKey()), dtuple2); + } + } + if (dtuple != null) { + filter.emit(dtuple); + } + } + }; + + /** + * The output port on which filtered key value pairs are emitted. + */ + public final transient DefaultOutputPort<HashMap<K, HashMap<K, V>>> filter = new DefaultOutputPort<HashMap<K, HashMap<K, V>>>(); + + /** + * getter function for parameter inverse + * + * @return inverse + */ + public boolean getInverse() + { + return inverse; + } + + /** + * True means match; False means unmatched + * + * @param val + */ + public void setInverse(boolean val) + { + inverse = val; + } + + /** + * Adds a key to the filter list + * + * @param str + */ + public void setKey(K str) + { + keys.put(str, null); + } + + /** + * Adds the list of keys to the filter list + * + * @param list + */ + public void setKeys(K[] list) + { + if (list != null) { + for (K e: list) { + keys.put(e, null); + } + } + } + + /* + * Clears the filter list + */ + public void clearKeys() + { + keys.clear(); + } + + /** + * Clones V object. By default assumes immutable object (i.e. a copy is not made). If object is mutable, override this method + * + * @param v value to be cloned + * @return returns v as is (assumes immutable object) + */ + public V cloneValue(V v) + { + return v; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMap.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMap.java new file mode 100644 index 0000000..43386f0 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMap.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.Stateless; + +import com.datatorrent.lib.util.BaseKeyOperator; +import com.datatorrent.lib.util.UnifierHashMap; + +/** + * This operator filters the incoming stream of key value pairs based on the keys specified by property "keys".. + * <p> + * Filters the incoming stream based of keys specified by property "keys". If + * property "inverse" is set to "true", then all keys except those specified by "keys" are emitted + * </p> + * <p> + * Operator assumes that the key, val pairs are immutable objects. If this operator has to be used for mutable objects, + * override "cloneKey()" to make copy of K, and "cloneValue()" to make copy of V.<br> + * This is a pass through node<br> + * <br> + * <b>StateFull : No, </b> tuple are processed in current window. <br> + * <b>Partitions : Yes, </b> no dependency among input tuples. <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: Expects Map<K,V><br> + * <b>filter</b>: Emits HashMap<K,V><br> + * <br> + * <b>Properties</b>:<br> + * <b>keys</b>: The keys to pass through, rest are filtered/dropped. A comma separated list of keys<br> + * <br> + * </p> + * + * @displayName Filter Keyval Pairs By Key Generic + * @category Rules and Alerts + * @tags filter, key value + * + * @since 0.3.2 + * @deprecated + */ +@Deprecated +@Stateless +@OperatorAnnotation(partitionable = true) +public class FilterKeysMap<K,V> extends BaseKeyOperator<K> +{ + /** + * Filter keys map. + */ + @NotNull() + HashMap<K, V> keys = new HashMap<K, V>(); + + /** + * Emits key not in filter map. + */ + boolean inverse = false; + + /** + * The input port on which key value pairs are received. + */ + public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>() + { + /** + * Processes incoming tuples one key,val at a time. Emits if at least one key makes the cut + * By setting inverse as true, match is changed to un-matched + */ + @Override + public void process(Map<K, V> tuple) + { + HashMap<K, V> dtuple = null; + for (Map.Entry<K, V> e: tuple.entrySet()) { + boolean contains = keys.containsKey(e.getKey()); + if ((contains && !inverse) || (!contains && inverse)) { + if (dtuple == null) { + dtuple = new HashMap<K, V>(4); // usually the filter keys are very few, so 4 is just fine + } + dtuple.put(cloneKey(e.getKey()), cloneValue(e.getValue())); + } + } + if (dtuple != null) { + filter.emit(dtuple); + } + } + }; + + /** + * The output port on which filtered key value pairs are emitted. + */ + public final transient DefaultOutputPort<HashMap<K, V>> filter = new DefaultOutputPort<HashMap<K, V>>() + { + @Override + public Unifier<HashMap<K, V>> getUnifier() + { + return new UnifierHashMap<K, V>(); + } + }; + + /** + * If true then only matches are emitted. If false then only non matches are emitted. + * @return inverse + */ + public boolean getInverse() + { + return inverse; + } + + + /** + * Sets the inverse property. If true then only matches are emitted. If false then only non matches are emitted. + * @param val + */ + public void setInverse(boolean val) + { + inverse = val; + } + + /** + * Adds a key to the filter list + * @param str + */ + public void setKey(K str) + { + keys.put(str, null); + } + + /** + * Adds the list of keys to the filter list + * @param list + */ + public void setKeys(K[] list) + { + if (list != null) { + for (K e: list) { + keys.put(e, null); + } + } + } + + /** + * The keys to filter. The values in the map should be null. + * @param keys + */ + public void setKeys(HashMap<K, V> keys) + { + this.keys = keys; + } + + /** + * Gets the keys to filter. + * @return Returns a map containing the keys. + */ + public HashMap<K, V> getKeys() + { + return keys; + } + + /* + * Clears the filter list + */ + public void clearKeys() + { + keys.clear(); + } + + /** + * Clones V object. By default assumes immutable object (i.e. a copy is not made). If object is mutable, override this method + * @param v value to be cloned + * @return returns v as is (assumes immutable object) + */ + public V cloneValue(V v) + { + return v; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMap.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMap.java new file mode 100644 index 0000000..7649706 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMap.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; + +import com.datatorrent.lib.util.BaseMatchOperator; + +/** + * This operator filters the incoming stream of key value pairs by obtaining the values corresponding to a specified key, + * and comparing those values to a specified number. The first key value pair, in each window, to satisfy the comparison is emitted. + * <p> + * A compare metric on a Number tuple based on the property "key", "value", and "cmp"; the first match is emitted. + * The comparison is done by getting double value from the Number. + * </p> + * <p> + * This module is a pass through<br> + * The operators by default assumes immutable keys. If the key is mutable, use cloneKey to make a copy<br> + * <br> + * <b>StateFull : Yes, </b> tuple are processed in current window. <br> + * <b>Partitions : No, </b>will yield wrong results. <br> + * <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: expects Map<K,V extends Number><br> + * <b>first</b>: emits HashMap<K,V><br> + * <br> + * <b>Properties</b>:<br> + * <b>key</b>: The key on which compare is done<br> + * <b>value</b>: The value to compare with<br> + * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br> + * <br> + * <b>Specific compile time checks</b>:<br> + * Key must be non empty<br> + * Value must be able to convert to a "double"<br> + * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br> + * <br> + * </p> + * + * @displayName Emit First Match (Number) + * @category Rules and Alerts + * @tags filter, key value, numeric + * + * @since 0.3.2 + * @deprecated + */ +@Deprecated +@OperatorAnnotation(partitionable = false) +public class FirstMatchMap<K, V extends Number> extends BaseMatchOperator<K,V> +{ + /** + * Tuple emitted flag. + */ + boolean emitted = false; + + /** + * The port on which key value pairs are received. + */ + public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>() + { + /** + * Checks if required key,val pair exists in the HashMap. If so tuple is emitted, and emitted flag is set + * to true + */ + @Override + public void process(Map<K, V> tuple) + { + if (emitted) { + return; + } + V val = tuple.get(getKey()); + if (val == null) { // skip if key does not exist + return; + } + if (compareValue(val.doubleValue())) { + first.emit(cloneTuple(tuple)); + emitted = true; + } + } + }; + + /** + * The output port on which the first satisfying key value pair is emitted. + */ + public final transient DefaultOutputPort<HashMap<K, V>> first = new DefaultOutputPort<HashMap<K, V>>(); + + /** + * Resets emitted flag to false + * @param windowId + */ + @Override + public void beginWindow(long windowId) + { + emitted = false; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstN.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstN.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstN.java new file mode 100644 index 0000000..f067fbb --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstN.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang.mutable.MutableInt; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; + +import com.datatorrent.lib.util.AbstractBaseNOperatorMap; + +/** + * This operator filters the incoming stream of key value pairs by emitting the first N key value pairs with a specified key in each window. + * <p> + * Emits first N tuples of a particular key. + * </p> + * <p> + * This module is a pass through module<br> + * <br> + * <b>StateFull : Yes, </b> tuple are compare across application window(s). <br> + * <b>Partitions : No, </b> will yield wrong results. <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: Input data port expects HashMap<K,V><br> + * <b>bottom</b>: Output data port, emits HashMap<K,V><br> + * <br> + * <b>Properties</b>:<br> + * <b>N</b>: The number of top values to be emitted per key<br> + * <br> + * <b>Specific compile time checks are</b>:<br> + * N: Has to be >= 1<br> + * <br> + * <br> + * </p> + * + * @displayName First N Keyval Pairs Matching Key + * @category Rules and Alerts + * @tags filter, key value + * @deprecated + * @since 0.3.2 + */ +@Deprecated +@OperatorAnnotation(partitionable = false) +public class FirstN<K,V> extends AbstractBaseNOperatorMap<K, V> +{ + /** + * key count map. + */ + HashMap<K, MutableInt> keycount = new HashMap<K, MutableInt>(); + + /** + * Inserts tuples into the queue + * @param tuple to insert in the queue + */ + @Override + public void processTuple(Map<K, V> tuple) + { + for (Map.Entry<K, V> e: tuple.entrySet()) { + MutableInt count = keycount.get(e.getKey()); + if (count == null) { + count = new MutableInt(0); + keycount.put(e.getKey(), count); + } + count.increment(); + if (count.intValue() <= getN()) { + first.emit(cloneTuple(e.getKey(), e.getValue())); + } + } + } + + /** + * The output port on which the first N key value pairs are emitted. + */ + public final transient DefaultOutputPort<HashMap<K, V>> first = new DefaultOutputPort<HashMap<K, V>>(); + + /** + * Clears the cache to start anew in a new window + */ + @Override + public void endWindow() + { + keycount.clear(); + } + + /** + * First N number of KeyValue pairs for each Key. + * + * @param val + */ + public void setN(int val) + { + super.setN(val); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatch.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatch.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatch.java new file mode 100644 index 0000000..c32a0f2 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatch.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; + +import com.datatorrent.lib.util.BaseMatchOperator; + +/** + * This operator filters the incoming stream of key value pairs by obtaining the values corresponding to a specified key, + * and comparing those values to a specified number. For each window, all key value pairs are emitted by the operator until a value satisfying the comparison is encountered. + * <p> + * All key.val pairs with val sub-classed from Number are emitted till the first match; A compare metric is done based on the property "key", + * "value", and "cmp". Then on no tuple is emitted in that window. The comparison is done by getting double value of the Number. + * </p> + * <p> + * This module is a pass through<br> + * <br> + * <b>StateFull : Yes, </b> tuple are processed in current window. <br> + * <b>Partitions : No, </b>will yield wrong results. <br> + * <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: Input port, expects HashMap<K,V><br> + * <b>first</b>: Output port, emits HashMap<K,V> if compare function returns true<br> + * <br> + * <b>Properties</b>:<br> + * <b>key</b>: The key on which compare is done<br> + * <b>value</b>: The value to compare with<br> + * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br> + * <br> + * <b>Specific compile time checks</b>:<br> + * Key must be non empty<br> + * Value must be able to convert to a "double"<br> + * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br> + * <br> + * </p> + * + * @displayName Emit Keyval Pairs Until Match (Number) + * @category Rules and Alerts + * @tags filter, key value, numeric + * @deprecated + * @since 0.3.2 + */ +@Deprecated +@OperatorAnnotation(partitionable = false) +public class FirstTillMatch<K, V extends Number> extends BaseMatchOperator<K, V> +{ + /** + * Tuple emitted flag. + */ + boolean emitted = false; + + /** + * The input port on which incoming key value pairs are received. + */ + public final transient DefaultInputPort<HashMap<K, V>> data = new DefaultInputPort<HashMap<K, V>>() + { + /** + * Compares the key,val pair with the match condition. Till a match is found tuples are emitted. + * Once a match is found, state is set to emitted, and no more tuples are compared (no more emits). + */ + @Override + public void process(HashMap<K, V> tuple) + { + if (emitted) { + return; + } + V val = tuple.get(getKey()); + if (val == null) { // skip if the key does not exist + return; + } + if (compareValue(val.doubleValue())) { + emitted = true; + } + if (!emitted) { + first.emit(cloneTuple(tuple)); + } + } + }; + + /** + * The output port on which key value pairs are emitted until the first match. + */ + public final transient DefaultOutputPort<HashMap<K, V>> first = new DefaultOutputPort<HashMap<K, V>>(); + + /** + * Emitted set is reset to false + * @param windowId + */ + @Override + public void beginWindow(long windowId) + { + emitted = false; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDesc.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDesc.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDesc.java new file mode 100644 index 0000000..4af9091 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDesc.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.PriorityQueue; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.AbstractBaseSortOperator; +import com.datatorrent.lib.util.ReversibleComparator; + +/** + * This operator takes the values it receives each window and outputs them in ascending order at the end of each window. + * <p> + * Incoming tuple is inserted into already existing sorted list in a descending order. At the end of the window the resultant sorted list is emitted on the output ports. + * </p> + * <p> + * <br> + * <b>StateFull : Yes, </b> tuple are compare across application window(s). <br> + * <b>Partitions : No, </b> will yield wrong results. <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: expects K<br> + * <b>datalist</b>: expects ArrayList<K><br> + * <b>sortlist</b>: emits ArrayList<K><br> + * <b>sorthash</b>: emits HashMap<K,Integer><br> + * <br> + * <br> + * </p> + * @displayName Sort Descending + * @category Stream Manipulators + * @tags rank, sort + * @deprecated + * @since 0.3.2 + */ +@Deprecated +// +// TODO: Override PriorityQueue and rewrite addAll to insert with location +// +@OperatorAnnotation(partitionable = false) +public class InsertSortDesc<K> extends AbstractBaseSortOperator<K> +{ + /** + * The input port on which individual tuples are received for sorting. + */ + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<K> data = new DefaultInputPort<K>() + { + /** + * Adds tuple to sorted queue + */ + @Override + public void process(K tuple) + { + processTuple(tuple); + } + }; + /** + * The input port on which lists of tuples are received for sorting. + */ + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<ArrayList<K>> datalist = new DefaultInputPort<ArrayList<K>>() + { + /** + * Adds tuples to sorted queue + */ + @Override + public void process(ArrayList<K> tuple) + { + processTuple(tuple); + } + }; + + /** + * The output port on which a sorted descending list of tuples is emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ArrayList<K>> sort = new DefaultOutputPort<ArrayList<K>>(); + @OutputPortFieldAnnotation(optional = true) + /** + * This output port emits a map from tuples to a count of the number of times each tuple occurred in the application window. + */ + public final transient DefaultOutputPort<HashMap<K, Integer>> sorthash = new DefaultOutputPort<HashMap<K, Integer>>(); + + @Override + public void initializeQueue() + { + pqueue = new PriorityQueue<K>(getSize(), new ReversibleComparator<K>(false)); + } + + + @Override + public void emitToList(ArrayList<K> list) + { + sort.emit(list); + } + + @Override + public void emitToHash(HashMap<K,Integer> map) + { + sorthash.emit(map); + } + + @Override + public boolean doEmitList() + { + return sort.isConnected(); + } + + @Override + public boolean doEmitHash() + { + return sorthash.isConnected(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndex.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndex.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndex.java new file mode 100644 index 0000000..7964ed7 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndex.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator.Unifier; +import com.datatorrent.api.annotation.OperatorAnnotation; + +import com.datatorrent.lib.util.BaseKeyValueOperator; + +/** + * This operator takes a stream of key value pairs each window, + * and outputs a set of inverted key value pairs at the end of each window. + * <p> + * Inverts the index and sends out the tuple on output port "index" at the end of the window. + * </p> + * <p> + * This is an end of window operator<br> + * <br> + * <b>StateFull : Yes, </b> tuple are compare across application window(s). <br> + * <b>Partitions : Yes, </b> inverted indexes are unified by instance of same operator. <br> + * <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: expects <K,V><br> + * <b>index</b>: emits <V,ArrayList<K>>(1); one HashMap per V<br> + * <br> + * </p> + * + * @displayName Invert Key Value Pairs + * @category Stream Manipulators + * @tags key value + * + * @since 0.3.2 + * @deprecated + */ +@Deprecated +@OperatorAnnotation(partitionable = true) +public class InvertIndex<K, V> extends BaseKeyValueOperator<K, V> implements Unifier<HashMap<V, ArrayList<K>>> +{ + /** + * Inverted key/value map. + */ + protected HashMap<V, ArrayList<K>> map = new HashMap<V, ArrayList<K>>(); + + /** + * The input port on which key value pairs are received. + */ + public final transient DefaultInputPort<HashMap<K, V>> data = new DefaultInputPort<HashMap<K, V>>() + { + /** + * Reverse indexes a HashMap<K, ArrayList<V>> tuple + */ + @Override + public void process(HashMap<K, V> tuple) + { + for (Map.Entry<K, V> e: tuple.entrySet()) { + if (e.getValue() == null) { // error tuple? + continue; + } + insert(e.getValue(), cloneKey(e.getKey())); + } + } + }; + + /** + * The output port on which inverted key value pairs are emitted. + */ + public final transient DefaultOutputPort<HashMap<V, ArrayList<K>>> index = new DefaultOutputPort<HashMap<V, ArrayList<K>>>() + { + @Override + public Unifier<HashMap<V, ArrayList<K>>> getUnifier() + { + return new InvertIndex<K, V>(); + } + }; + + /** + * + * Returns the ArrayList stored for a key + * + * @param key + */ + void insert(V val, K key) + { + ArrayList<K> list = map.get(val); + if (list == null) { + list = new ArrayList<K>(4); + map.put(cloneValue(val), list); + } + list.add(key); + } + + /** + * Emit all the data and clear the hash + * Clears internal data + */ + @Override + public void endWindow() + { + for (Map.Entry<V, ArrayList<K>> e: map.entrySet()) { + HashMap<V, ArrayList<K>> tuple = new HashMap<V, ArrayList<K>>(1); + tuple.put(e.getKey(), e.getValue()); + index.emit(tuple); + } + map.clear(); + } + + /** + * Unifier override. + */ + @Override + public void process(HashMap<V, ArrayList<K>> tuple) + { + for (Map.Entry<V, ArrayList<K>> e: tuple.entrySet()) { + ArrayList<K> keys; + if (map.containsKey(e.getKey())) { + keys = map.remove(e.getKey()); + } else { + keys = new ArrayList<K>(); + } + keys.addAll(e.getValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArray.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArray.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArray.java new file mode 100644 index 0000000..26b77ac --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArray.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; + +import com.datatorrent.lib.util.BaseKeyValueOperator; + +/** + * This operator takes a stream of key value pairs each window, + * and outputs a set of inverted key value pairs at the end of each window. + * The values in the key value pairs received by this operator are an array lists, which may multiple values. + * <p> + * Inverts the index and sends out the tuple on output port "index" at the end of the window. + * </p> + * <p> + * This is an end of window operator<br> + * <br> + * <b>StateFull : Yes, </b> tuple are compare across application window(s). <br> + * <b>Partitions : Yes, </b> inverted indexes are unified by instance of same operator. <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: expects HashMap<K,ArrayList<V>><br> + * <b>index</b>: emits HashMap<V,ArrayList<K>>(1), one HashMap per V<br> + * <br> + * </p> + * + * @displayName Invert Key Value Pairs (Array) + * @category Stream Manipulators + * @tags key value + * + * @since 0.3.2 + * @deprecated + */ +@Deprecated +@OperatorAnnotation(partitionable = true) +public class InvertIndexArray<K, V> extends BaseKeyValueOperator<K,V> +{ + /** + * Inverted key/value map. + */ + protected HashMap<V, ArrayList<K>> map = new HashMap<V, ArrayList<K>>(); + + /** + * The input port on which key value pairs are received. + */ + public final transient DefaultInputPort<HashMap<K, ArrayList<V>>> data = new DefaultInputPort<HashMap<K, ArrayList<V>>>() + { + /** + * Reverse indexes a HashMap<K, ArrayList<V>> tuple + */ + @Override + public void process(HashMap<K, ArrayList<V>> tuple) + { + for (Map.Entry<K, ArrayList<V>> e: tuple.entrySet()) { + ArrayList<V> alist = e.getValue(); + if (alist == null) { // error tuple? + continue; + } + for (V val : alist) { + insert(val, cloneKey(e.getKey())); + } + } + } + }; + + /** + * The output port or which inverted key value pairs are emitted. + */ + public final transient DefaultOutputPort<HashMap<V, ArrayList<K>>> index = new DefaultOutputPort<HashMap<V, ArrayList<K>>>() + { + @Override + public Unifier<HashMap<V, ArrayList<K>>> getUnifier() + { + return new InvertIndex<K, V>(); + } + }; + + /** + * + * Returns the ArrayList stored for a key + * + * @param key + */ + void insert(V val, K key) + { + ArrayList<K> list = map.get(val); + if (list == null) { + list = new ArrayList<K>(4); + map.put(cloneValue(val), list); + } + list.add(key); + } + + /** + * Emit all the data and clear the hash + */ + @Override + public void endWindow() + { + for (Map.Entry<V, ArrayList<K>> e: map.entrySet()) { + HashMap<V, ArrayList<K>> tuple = new HashMap<V, ArrayList<K>>(1); + tuple.put(e.getKey(), e.getValue()); + index.emit(tuple); + } + map.clear(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMap.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMap.java new file mode 100644 index 0000000..188c9b1 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMap.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.HashMap; +import java.util.Map; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; + +import com.datatorrent.lib.util.BaseMatchOperator; + +/** + * This operator filters the incoming stream of key value pairs by obtaining the values corresponding to a specified key, + * and comparing those values to a specified value. The last key value pair, in each window, to satisfy the comparison is emitted. + * <p> + * A compare function is operated on a tuple value sub-classed from Number based on the property "key", "value", and "cmp". Every tuple + * is checked and the last one that passes the condition is send during end of window on port "last". The comparison is done by getting double + * value from the Number. + * </p> + * <p> + * This module is an end of window module<br> + * <br> + * <b>StateFull : Yes, </b> tuple are compare across application window(s). <br> + * <b>Partitions : No, </b> will yield wrong result. <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: expects Map<K,V extends Number><br> + * <b>last</b>: emits Map<K,V> in end of window for the last tuple on which the compare function is true<br> + * <br> + * <b>Properties</b>:<br> + * <b>key</b>: The key on which compare is done<br> + * <b>value</b>: The value to compare with<br> + * <b>cmp</b>: The compare function. Supported values are "lte", "lt", "eq", "neq", "gt", "gte". Default is "eq"<br> + * <br> + * <b>Specific compile time checks</b>:<br> + * Key must be non empty<br> + * Value must be able to convert to a "double"<br> + * Compare string, if specified, must be one of "lte", "lt", "eq", "neq", "gt", "gte"<br> + * <br> + * </p> + * + * @displayName Emit Last Match (Number) + * @category Rules and Alerts + * @tags filter, key value, numeric + * @deprecated + * @since 0.3.2 + */ +@Deprecated +@OperatorAnnotation(partitionable = false) +public class LastMatchMap<K, V extends Number> extends BaseMatchOperator<K,V> +{ + /** + * Last tuple. + */ + protected HashMap<K, V> ltuple = null; + + /** + * The input port on which key value pairs are received. + */ + public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>() + { + /** + * Processes tuples and keeps a copy of last matched tuple + */ + @Override + public void process(Map<K, V> tuple) + { + V val = tuple.get(getKey()); + if (val == null) { + return; + } + if (compareValue(val.doubleValue())) { + ltuple = cloneTuple(tuple); + } + } + }; + + /** + * The output port on which the last key value pair to satisfy the comparison function is emitted. + */ + public final transient DefaultOutputPort<HashMap<K, V>> last = new DefaultOutputPort<HashMap<K, V>>(); + + /** + * Emits last matching tuple + */ + @Override + public void endWindow() + { + if (ltuple != null) { + last.emit(ltuple); + } + ltuple = null; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMap.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMap.java new file mode 100644 index 0000000..af5229c --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMap.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.misc.algo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; + +import com.datatorrent.lib.util.AbstractBaseFrequentKey; +import com.datatorrent.lib.util.UnifierArrayHashMapFrequent; +import com.datatorrent.lib.util.UnifierHashMapFrequent; + +/** + * This operator filters the incoming stream of key value pairs by finding the key or keys (if there is a tie) that occur the fewest number of times within each window. + * A list of the corresponding key value pairs are then output to the port named "list" and one of the corresponding key value pairs is output to the port "least", at the end of each window. + * <p> + * Occurrences of each key is counted and at the end of window any of the least frequent key is emitted on output port least and all least frequent + * keys on output port list. + * </p> + * <p> + * This module is an end of window module. In case of a tie any of the least key would be emitted. The list port would however have all the tied keys<br> + * <br> + * <b>StateFull : Yes, </b> tuple are compared across application window(s). <br> + * <b>Partitions : Yes, </b> least keys are unified on output port. <br> + * <br> + * <b>Ports</b>:<br> + * <b>data</b>: expects Map<K,V>, V is ignored/not used<br> + * <b>least</b>: emits HashMap<K,Integer>(1); where String is the least frequent key, and Integer is the number of its occurrences in the window<br> + * <b>list</b>: emits ArrayList<HashMap<K,Integer>(1)>; Where the list includes all the keys are least frequent<br> + * <br> + * </p> + * + * @displayName Emit Least Frequent Tuple Key + * @category Rules and Alerts + * @tags filter, key value, count + * @deprecated + * @since 0.3.2 + */ +@Deprecated +@OperatorAnnotation(partitionable = true) +public class LeastFrequentKeyMap<K, V> extends AbstractBaseFrequentKey<K> +{ + /** + * The input port on which key value pairs are received. + */ + public final transient DefaultInputPort<Map<K, V>> data = new DefaultInputPort<Map<K, V>>() + { + /** + * Calls super.processTuple(tuple) for each key in the HashMap + */ + @Override + public void process(Map<K, V> tuple) + { + for (Map.Entry<K, V> e: tuple.entrySet()) { + processTuple(e.getKey()); + } + } + }; + + /** + * The output port on which one of the tuples, + * which occurred the least number of times, + * is emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<HashMap<K, Integer>> least = new DefaultOutputPort<HashMap<K, Integer>>() + { + @Override + public Unifier<HashMap<K, Integer>> getUnifier() + { + Unifier<HashMap<K, Integer>> ret = new UnifierHashMapFrequent<K>(); + ((UnifierHashMapFrequent<K>)ret).setLeast(true); + return ret; + } + }; + + /** + * The output port on which all the tuples, + * which occurred the least number of times, + * is emitted. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ArrayList<HashMap<K, Integer>>> list = new DefaultOutputPort<ArrayList<HashMap<K, Integer>>>() + { + @Override + public Unifier<ArrayList<HashMap<K, Integer>>> getUnifier() + { + Unifier<ArrayList<HashMap<K, Integer>>> ret = new UnifierArrayHashMapFrequent<K>(); + ((UnifierArrayHashMapFrequent<K>)ret).setLeast(true); + return ret; + } + }; + + /** + * Emits tuple on port "least" + * + * @param tuple + */ + @Override + public void emitTuple(HashMap<K, Integer> tuple) + { + least.emit(tuple); + } + + /** + * Emits tuple on port "list" + * + * @param tlist + */ + @Override + public void emitList(ArrayList<HashMap<K, Integer>> tlist) + { + list.emit(tlist); + } + + /** + * returns val1 < val2 + * + * @param val1 + * @param val2 + * @return val1 < val2 + */ + @Override + public boolean compareCount(int val1, int val2) + { + return val1 < val2; + } +}
