http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/resources/log4j.properties b/demos/highlevelapi/src/test/resources/log4j.properties new file mode 100644 index 0000000..592eb19 --- /dev/null +++ b/demos/highlevelapi/src/test/resources/log4j.properties @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=INFO,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=INFO +#log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=WARN + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +#log4j.logger.org=INFO + +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=INFO +log4j.logger.org.apache.apex=INFO
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/resources/sampletweets.txt ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/resources/sampletweets.txt b/demos/highlevelapi/src/test/resources/sampletweets.txt new file mode 100644 index 0000000..475fada --- /dev/null +++ b/demos/highlevelapi/src/test/resources/sampletweets.txt @@ -0,0 +1,207 @@ +Tweet content +"Apple has $233 billion in cash. It could buy all + +â@NFL teams +â@NBA teams +â@MLB teams +â@NHL teams + +...and still have $80 billion left. $AAPL" +Read $MRNJ #NEWS, $HEMP & $GRCU r above .01, that's where we r goinð $SPY $MSFT $SBUX $SFOR $VRX $AAPL $TSLA $GOOG $FB $EURUSD $USDJPY $MLCG +$RXSF is leadin their sector accordin 2 @EdisonMediaCen $AAPL $SPY $TSLA $FB $EURUSD $ALK $IBB $EW $AMZN $GBPUSD $GM https://t.co/LYY2mHn755 +Philstockworld Top Trade Review $AAPL $MSFT #Dividends $USO $HOV $TWTR -- https://t.co/JArXsIm7CI https://t.co/kRR9ezhm9E +Philstockworld Top Trade Review: $AAPL $ABX $BA $CAKE $CMG $DIS $IBM $GILD $LL $UNG $SPY -- https://t.co/EX5SYjdwBC https://t.co/7FBZwVZ63v +"Mondayâs Oil Mess: Rent-A-Rebel Jacks up Prices into the Holiday $USO $AAPL +#Earnings -- https://t.co/cGHB3WDKA8 https://t.co/JFZIBcom1n" +Meaningless Monday Market Movement! $AAPL $SQQQ #oil #Brexit https://t.co/j4Iqg7E1HN +"S&P Futures Back over 2,050, for Now +$SPY $AAPL $SQQQ #China #Debt #Hedging -- https://t.co/2dOc5T89S3 https://t.co/TDPVdNRNQF" +"ð¥TURN YOUR $500 INTO $5,000+ð¥ + +JOIN #TEAMBILLIONAIRE⤵ +ð§ [email protected] + +#PENNYSTOCKS $AAPL $GSAT $MGT +https://t.co/lwAGjfmIP3" +Trendless Tuesday - Watch Yesterdayâs Fake Gains Disappear $AAPL #China $FXI #Earnings -- https://t.co/GpgGqoOlFn https://t.co/FRuixv5aZF +"ð¥TURN YOUR $500 INTO $5,000+ð¥ + +JOIN #TEAMBILLIONAIRE⤵ +ð§ [email protected] + +#PENNYSTOCKS $AAPL $UVXY $JDST +https://t.co/lwAGjfmIP3" +"Apple has $233 billion in cash. It could buy: + +Uber +Tesla +Twitter +Airbnb +Netflix +Yahoo + +...and still have $18 billion left. $AAPL" +Option Opportunity Portfolio May Review â Up 19.3% In 30 Days! $ABX $FCX $USO $AAPL $DIS - https://t.co/rp3kMsRZ3E https://t.co/TKkc15pKcR +Waiting for the Fed â Apple Gives Us Huge Wins: $AAPL $SQQQ #GDP #Nikkei #Futures #Oil -- https://t.co/Al3pkf350V https://t.co/LktIRF4F2b +Tempting Tuesday - S&P 2,100 is Still the Line to Watch Ahead of the Fed $AAPL $QQQ -- https://t.co/t1eDfKHJnk https://t.co/BAW3RAe7SC +Our $SQQQ Hedge is Up 314% and Our Futures Are Up $4,850, You're Welcome! $AAPL -- https://t.co/eUQ2kCkCOY https://t.co/Yk98oyqMZl +"TURN YOUR ð²500 INTO ð²5,000$ð¥ + +JOIN #TEAMBILLIONAIRE ⤵ +ð§ [email protected] + +#PENNYSTOCKS $TWTR $AAPL $LNKD +https://t.co/euJFNQX1g4" +"TURN YOUR ð²500 INTO ð²5,000$ð¥ + +JOIN #TEAMBILLIONAIRE ⤵ +ð§ [email protected] + +#PENNYSTOCKS $TALK $PPPI $AAPL https://t.co/oSn11kxftM" +Bears today. We getting paid! $AAPL $TWTR $BWLD $NFLX https://t.co/CCi0S3skJJ +"Apple has $233 billion in cash. It could buy all + +â@NFL teams +â@NBA teams +â@MLB teams +â@NHL teams + +...and still have $80 billion left. $AAPL" +Are you in Sync with the market? https://t.co/ZtHHCrSAf8 #stocks #finance #investing #trading $AAPL $LNKD $NFLX $GOOGL $FB +The Last Time These Insiders Purchased This Stock It Sky Rocketed 1000%+ https://t.co/bmNAHBoQBD $DIA $QQQ $SPY $GOOG $AAPL $BAC $TWTR $FB +"This Hacker Made Amazonâs Alexa, Google Now, and Siri Command One Another +https://t.co/YXP3yqmf4H $AAPL $AMZN $GOOG https://t.co/NG7r6qgfRt" +"Over the last 3 years, the top 14 automakers upped their combined R&D spend by $192 million. + +$AAPL upped R&D spend by $5 billion. + +- MS" +Volatility can be your friend. https://t.co/aHz2r8HHD2 #stocks #trading #investing #financials #learntodaytrade $FB $AAPL $LNKD $NFLX +"PERCENTAGE of Apple's Revenues: +FY 2006: +iPod 40% +Mac 38% +Services 10% +Others 12% + +FY 2015: +iPhone 66% +Mac 11% +iPad 10% +Others 13% + +$AAPL" +Apple recovered $40 million worth of gold from recycled iPhones, iPads & Macs in 2015. https://t.co/XPBWlM6cBs $AAPL https://t.co/P0LMSRw7Ot +"Apple's iPhone sales sink for 1st time ever last quarter +https://t.co/TAKjUwl4Yc @DavidGoldmanCNN @cnntech $AAPL https://t.co/OrDp4BDpsD" +$BAC is down 5% since our article was posted on Friday https://t.co/al8AgaSsiI $DIA $QQQ $SPY $AAPL $GOOG $FB $TWTR $STUDY $NFLX $LNKD $IBM +Ben Franklin: The First Proponent Of Dividend Growth Investing? https://t.co/dx7FE2G9AH $AAPL $ACN $AL $BEN $CSV $HON $IJR $JNJ $JWN $PEGI +$5,000 Friday the 13th - Yesterday's Futures Trades Pay Off Nicely $USO $SPY $AAPL -- https://t.co/3RUEjAq1bO https://t.co/2L7cdebTlT +I DON'T SEE ANY BUBBLE RIGHT NOW , I SWEAR ! $SPX $SPY $DIA $DJI $AAPL $VIX $TVIX $C $BAC $GM $GE $FB #STOCKMARKET https://t.co/E5954RIpC7 +Terrible $AAPL quarter, finally. On the way to becoming $NOK. Tech is mean reverting, today's leaders are almost always tomorrow's laggards. +The iPhone 7S could look radically different from the iPhones of today https://t.co/eQxUMAZ4eM $AAPL https://t.co/HIH3QqKpIC +"No Bull: The Evidence +https://t.co/Md2SNpjdwd +$SPX $MSFT $GOOGL $AAPL $NFLX $AMZN $FB $DIS $V $BAC $GS $WMT $SBUX https://t.co/1oISHNX4cJ" +The iPhone 7S could look radically different from the iPhones of today https://t.co/KgeVSjmcGe $AAPL https://t.co/7hFtg37oJu +There was a 3rd Apple founder, Ronald Wayne, who sold his 10% stake for $800 in 1976. Today his share would've been worth $65 Billion. $AAPL +Twitter Stock Set to Breakout Soon https://t.co/u4V6ChhpOW $TWTR $DIA $QQQ $SPY $AAPL $GLD $GDX $NUGT $DUST $BAC $GOOG $FB $STUDY $NFLX $IBM +Alibaba Stock Price Breaks The 50 Day Moving Average https://t.co/ABOVWI6j2G $BABA $AAPL $YHOO $COST $UWTI $CSC $MON https://t.co/VlWGDxrQXh +I still canât shake the feeling that $AAPL is slowly taking themselves private. https://t.co/XIAMvppDWh https://t.co/kdMGCGbMaJ +$SPX ROADMAP 2016 #STOCKMARKET $INTC $F $SPY $AAPL $AMZN $C $VIX $FB $TWTR $GOOGL $UVXY $FAZ $FEZ $MSFT $GS $BAC $AA https://t.co/owuQ9awcDw +"Want to know why $GOOG is so impressive and why $AAPL is so fucked? Read this years founders' letter from $GOOG: + +https://t.co/LiBjGZwyKw" +"GET READY. Here are the companies reporting earnings next week: https://t.co/NXptPkQX70 + +$AAPL $FB $TWTR $CMG $GILD https://t.co/tcIoCZdOZi" +$SPX THIS TIME IT'S DIFFERENT! $SPY $DIA $SDOW $S $FAZ $FEZ $AAPL $MSFT $BAC $C $JPM $GS $SIRI $AMZN $F $VIX $TVIX https://t.co/pkYVgNKv3P +$SPX ROADMAP 2016 #STOCKS $TVIX $VXX $VALE $AAPL $AKS $FCX $MSFT $AA $MU $VIX $SPX $SPY #TRADING $PCLN $SIRI $ MCD https://t.co/6UH5He38h1 +The iPhone 6S is the first iPhone ever to sell fewer models than its predecessor https://t.co/s8iQOvPQeR $AAPL https://t.co/QmQROtQ9vY +11/ For example, buy an Echo and see your behavior change. The future is happening, and $AAPL seems, to me, asleep. +$RLYP $SPY $KORS $WDAY $MSFT $AAPL $QLIK $TIVO $NXPI $CPXX $AVGO $ZOES $LE $TICC $SLB $FCEL $VRA $MLNX $ASNA $ICPT https://t.co/LXMpz4rFG0 +#STOCKMARKET GRAVITY LESSONS: what goes up must come down $SPX $SPY $DIA $QQQ $TVIX $VIX $AAPL $C $FB $PCLN $BAC $F https://t.co/8HQHBEgSj5 +Should Icahn's exit or Buffett's entry affect your $AAPL judgment? The Big Name Effect. https://t.co/9Z2ok61MUh https://t.co/udAQLfdJFe +Apple revenue drops 13 percent, ending 13 years of growth. Greater China was especially weak, down 26 percent. $AAPL https://t.co/q4ovXUenBU +It was a $18 billion day for Apple. https://t.co/iRbGeoTmCJ $AAPL +"Apple has $233 billion in cash. It could buy: + +Uber +Tesla +Twitter +Airbnb +Netflix +Yahoo + +...and still have $18 billion left. $AAPL" +#3 TOP 2111.05 #STOCKS #STOCKMARKET #TRADING $SPX $SPY $VIX $TVIX $AAPL $SIRI $C $BAC $JPM $AMZN $MSFT $FB $TWTR $F https://t.co/gSqmN0fVON +Google #IO16: Android's failure to innovate hands a Apple free run at WWDC $GOOG $AAPL https://t.co/FTs9M8JD5g https://t.co/20uou1gUkW +$SPX 2134.72..2116.48...2111.05 HOUSTON WE HAVE A PROBLEM ! #STOCKMARKET $VIX $SPY $DIA $AAPL $C $BAC $FB $VXX $MSFT https://t.co/du3QfPUM4Q +top #earnings $FB $AAPL $AMZN $TWTR $CMG $F $GILD $LNKD $FCX $CELG $SWKS $JBLU $T $NXPI $BA https://t.co/lObOE0uRjZ https://t.co/94F6GJc3hE +The iPhone 6S is the first iPhone ever to sell fewer models than its predecessor https://t.co/ZVeQ9a4Yrh $AAPL https://t.co/2Ntpbxwlyo +You do not want to miss this incredibly candid look into $AAPL w/ @tim_cook! Tune into @MadMoneyOnCNBC on @CNBC now! https://t.co/budv4qfvju +Foxconn axes 60,000 jobs in one Chinese factory as robots take over: https://t.co/BnFdjGCmLf $AAPL https://t.co/WhRHer8jdN +Warren Buffett's Berkshire Hathaway reports 9.8M share stake in $AAPL https://t.co/nXmvK6PV7M https://t.co/MAcMz0iTg6 +Apple is about to report its worst quarter in 13 years on Tuesday https://t.co/NJ3hwunHCx $AAPL https://t.co/YLTmnpqNjI +Everyone who wants an iPhone has one. $AAPL is now a consumer staple stock and will trade on replacement / shareholder yield. +Financial Armageddonâ is imminent, the next major crash will happen in 2016 $VXX $VIX $TVIX $SPX $SPY $AAPL $MSFT $BAC $C $FB $DJI $DIA $F +"Apple is NO longer the largest US stock by market cap. Google is: https://t.co/i81Y83jQJC + +$GOOGL $AAPL https://t.co/cRCKRYBICS" +Exclusive: Apple hires former Tesla VP Chris Porritt for âspecial [car] projectâ https://t.co/7knsloxvJW $TSLA $AAPL https://t.co/X8cYztExoP +$SPX on the top of downtrend Channel Be careful! #STOCKMARKET $SPY $AAPL $AMZN $TSLA $FB $QQQ $DIA $NFLX $PCLN $C $F https://t.co/UKZCyLYuBq +UPDATE: Apple CEO Cook says in conference call that smartphone marker is 'currently not growing' $AAPL https://t.co/WeECmrdv1j +In February Charlie Munger was asked why Berkshire owns $GM. The $AAPL stake isn't anymore complicated than this: https://t.co/Rwkb30OEgq +Talking to @SquawkStreet about $AAPL & more at @NYSE ! https://t.co/m05b68VLMp +iPhone sales sour #Apple's earning: https://t.co/962fj9SWsc $AAPL https://t.co/nz9FRK6sNK +People arenât upgrading smartphones as quickly and that is bad for Apple https://t.co/EOEJPfNR8Z ð $AAPL +"$NXPI $JBLU $FCX $AAPL $CMG +$TWTR $EBAY $BWLD $PNRA $CRUS +$FB $FSLR $UPS $CELG $AMZN +$LNKD $BIDU $SWKS $GILD $HELE https://t.co/rQUmhHgYn0" +People mad that Icahn sold $AAPL without giving them the headâs up - How much in commissions did you pay him this year? +Cool stat: $AAPL's $46 billion loss in market cap overnight is greater than the market cap of 391 S&P 500 companies https://t.co/1ms1YZzTbP +Apple. You've come a long way... https://t.co/WGvk8K8MYv $AAPL https://t.co/3Wo0hAwRAc +"Someone is building the Internet's biggest list of stock market cliches and it's amazing: https://t.co/mIV169cF36 + +$SPY $AAPL $EURUSD" +JUST IN: Apple delays earnings release by one day, to April 26th after the bell. ⢠$AAPL +Apple's market value is down nearly three Twitters $AAPL $TWTR +Trump warns of a tech bubble: https://t.co/6Ks1yTa4Zc $AAPL $FB $AMZN He's 100% right about this. https://t.co/dJgTLk5JOB +Apple could sell its billionth iPhone in just a few months' time https://t.co/g6VYDFIE3d $AAPL https://t.co/jzucmxDYXe +$SPX KEEP BLOWING #STOCKMARKET #BUBBLE #STOCKS $MSFT $GS $AAPL $SPY $DIA $DJI $C $SIRI $PCLN $BAC $JPM $VIX $TVIX https://t.co/GPFBb0uCLF +Will Apple $AAPL fall from tree? 12-mo descending triangle. I've no interest to short it, but it will be wild ride https://t.co/AnjsIKmIHI +Tim Cook shouldn't be doing TV w/out a new product. Looks desperate. Not a consumer-facing guy. $AAPL https://t.co/Z4UFSimTLg +When will Apple will sell its billionth iPhone? It may be sooner than you think: https://t.co/5IaF018N1p $AAPL https://t.co/cCIgtKqWHA +#Stockmarket downtrend continues next week $spx $spy $vix $tvix $dji $aapl $jpm $bac $c $msft $pcln $wmt $ba https://t.co/1TTlgnKnZc +$AAPL https://t.co/AFANPYHnoq +40 years ago this month, Apple co-founder Ronald Wayne sold his 10% stake in $AAPL for $800. Current value: $61 billion. +Warren Buffett's Berkshire Hathaway reports 9.8M share stake in $AAPL https://t.co/rXWwuyIooI https://t.co/TztgKCcWWy +Apple's iBooks and iTunes Movies in China have been shut down after less than 7 months https://t.co/ZuGXZqSHma $AAPL https://t.co/1OHGC9YiUf +Possible buy on $AAPL as it drops onto it's 9 DEMA support #1Broker #Bitcoin #Blockchain https://t.co/WWssD01joh https://t.co/jOKJyG9EaJ +"Apple is down 7% after earnings. + +That's about $40 BILLION in market cap gone in 30 minutes. Poof. + +$AAPL: https://t.co/ggfmPjJjkW" +B4 CRASH 2008 - Paulson's speech:" OUR FINANCIAL SYSTEM IS STRONG" $VXX $VIX $TVIX $UVXY $SPX $SPY $AAPL $MSFT $BAC $C $FB $DJI $DIA $F +$ONCI is ready to RUN this week! #stockmarket #pennystocks #parabolic $CDNL $MGT $GOOGL $AAPL $TSLA $TWTR $ONCI https://t.co/wwqf0RNOix +Apple could sell its billionth iPhone in just a few months' time https://t.co/u2qFZ440dH $AAPL https://t.co/8cAchiZ0vC +The iPhone might radically change in 2017 $AAPL https://t.co/IXLdCfEdus https://t.co/GpdMvFZPjE +"The growth of smartphones. On one graph. + +A great share via: https://t.co/2hAJlarjSM + +$AAPL $GOOGL $MSFT https://t.co/BAwQRvYzou" +"$AAPL finished last quarter with $232 billion in cash, meanwhile Kanye running up debts making records for Tidal. + +Bro." +Which is bullish for $AAPL if you know anything about $GS https://t.co/WWssD01joh https://t.co/CQk8iKMI7w +"The tech stocks with the MOST revenue + +1. $AAPL +2. $AMZN +3. $MSFT + +Visual by @OphirGottlieb: https://t.co/GpZ5ct2z5r https://t.co/H6sNKdtBHd" + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/resources/wordcount/word.txt ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/test/resources/wordcount/word.txt b/demos/highlevelapi/src/test/resources/wordcount/word.txt new file mode 100644 index 0000000..edd0f51 --- /dev/null +++ b/demos/highlevelapi/src/test/resources/wordcount/word.txt @@ -0,0 +1,8 @@ +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error +bye \ No newline at end of file http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/pom.xml ---------------------------------------------------------------------- diff --git a/demos/pom.xml b/demos/pom.xml index e9f2daf..3528e7a2 100644 --- a/demos/pom.xml +++ b/demos/pom.xml @@ -174,6 +174,7 @@ <id>all-modules</id> <modules> <module>distributedistinct</module> + <module>highlevelapi</module> </modules> </profile> </profiles> @@ -192,7 +193,6 @@ <module>r</module> <module>echoserver</module> <module>iteration</module> - <module>highlevelapi</module> </modules> <dependencies> @@ -232,17 +232,6 @@ </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-stream</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java new file mode 100644 index 0000000..57db6d7 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang3.tuple.MutablePair; + +/** + * Average Accumulation + */ +public class Average implements Accumulation<Double, MutablePair<Double, Long>, Double> +{ + @Override + public MutablePair<Double, Long> defaultAccumulatedValue() + { + return new MutablePair<>(0.0, 0L); + } + + @Override + public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input) + { + accu.setLeft(accu.getLeft() * ((double)accu.getRight() / (accu.getRight() + 1)) + input / (accu.getRight() + 1)); + accu.setRight(accu.getRight() + 1); + return accu; + } + + @Override + public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2) + { + accu1.setLeft(accu1.getLeft() * ((double)accu1.getRight() / accu1.getRight() + accu2.getRight()) + + accu2.getLeft() * ((double)accu2.getRight() / accu1.getRight() + accu2.getRight())); + accu1.setRight(accu1.getRight() + accu2.getRight()); + return accu1; + } + + @Override + public Double getOutput(MutablePair<Double, Long> accumulatedValue) + { + return accumulatedValue.getLeft(); + } + + @Override + public Double getRetraction(Double value) + { + // TODO: Need to add implementation for retraction. + return 0.0; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java new file mode 100644 index 0000000..2c01a0b --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang3.mutable.MutableLong; + +/** + * Count Accumulation + */ +public class Count implements Accumulation<Long, MutableLong, Long> +{ + + @Override + public MutableLong defaultAccumulatedValue() + { + return new MutableLong(0); + } + + @Override + public MutableLong accumulate(MutableLong accumulatedValue, Long input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2) + { + accumulatedValue1.add(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public Long getOutput(MutableLong accumulatedValue) + { + return accumulatedValue.getValue(); + } + + @Override + public Long getRetraction(Long value) + { + return -value; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java new file mode 100644 index 0000000..5716cad --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * Fold Accumulation Adaptor class + */ +public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT> +{ + + public FoldFn() + { + } + + public FoldFn(OUTPUT initialVal) + { + this.initialVal = initialVal; + } + + private OUTPUT initialVal; + + @Override + public OUTPUT defaultAccumulatedValue() + { + return initialVal; + } + + @Override + public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input) + { + return fold(accumulatedValue, input); + } + + @Override + public OUTPUT getOutput(OUTPUT accumulatedValue) + { + return accumulatedValue; + } + + @Override + public OUTPUT getRetraction(OUTPUT value) + { + return null; + } + + abstract OUTPUT fold(OUTPUT result, INPUT input); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java new file mode 100644 index 0000000..632cad5 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * Group accumulation. + */ +public class Group<T> implements Accumulation<T, List<T>, List<T>> +{ + @Override + public List<T> defaultAccumulatedValue() + { + return new ArrayList<>(); + } + + @Override + public List<T> accumulate(List<T> accumulatedValue, T input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2) + { + accumulatedValue1.addAll(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public List<T> getOutput(List<T> accumulatedValue) + { + return accumulatedValue; + } + + @Override + public List<T> getRetraction(List<T> value) + { + // TODO: Need to add implementation for retraction. + return new ArrayList<>(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java new file mode 100644 index 0000000..1002b49 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import java.util.Comparator; +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * Max accumulation. + */ +public class Max<T> implements Accumulation<T, T, T> +{ + + Comparator<T> comparator; + + public void setComparator(Comparator<T> comparator) + { + this.comparator = comparator; + } + + @Override + public T defaultAccumulatedValue() + { + return null; + } + + @Override + public T accumulate(T accumulatedValue, T input) + { + if (accumulatedValue == null) { + return input; + } else if (comparator != null) { + return (comparator.compare(input, accumulatedValue) > 0) ? input : accumulatedValue; + } else if (input instanceof Comparable) { + return (((Comparable)input).compareTo(accumulatedValue) > 0) ? input : accumulatedValue; + } else { + throw new RuntimeException("Tuple cannot be compared"); + } + } + + @Override + public T merge(T accumulatedValue1, T accumulatedValue2) + { + return accumulate(accumulatedValue1, accumulatedValue2); + } + + @Override + public T getOutput(T accumulatedValue) + { + return accumulatedValue; + } + + @Override + public T getRetraction(T value) + { + // TODO: Need to add implementation for retraction. + return null; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java new file mode 100644 index 0000000..66248f4 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import java.util.Comparator; +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * Min accumulation + */ +public class Min<T> implements Accumulation<T, T, T> +{ + + Comparator<T> comparator; + + public void setComparator(Comparator<T> comparator) + { + this.comparator = comparator; + } + + @Override + public T defaultAccumulatedValue() + { + return null; + } + + @Override + public T accumulate(T accumulatedValue, T input) + { + if (accumulatedValue == null) { + return input; + } else if (comparator != null) { + return (comparator.compare(input, accumulatedValue) < 0) ? input : accumulatedValue; + } else if (input instanceof Comparable) { + return (((Comparable)input).compareTo(accumulatedValue) < 0) ? input : accumulatedValue; + } else { + throw new RuntimeException("Tuple cannot be compared"); + } + } + + @Override + public T merge(T accumulatedValue1, T accumulatedValue2) + { + return accumulate(accumulatedValue1, accumulatedValue2); + } + + @Override + public T getOutput(T accumulatedValue) + { + return accumulatedValue; + } + + @Override + public T getRetraction(T value) + { + // TODO: Need to add implementation for retraction. + return null; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java new file mode 100644 index 0000000..c21ab32 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * An easy to use reduce Accumulation + * @param <INPUT> + */ +public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT> +{ + @Override + public INPUT defaultAccumulatedValue() + { + return null; + } + + @Override + public INPUT accumulate(INPUT accumulatedValue, INPUT input) + { + if (accumulatedValue == null) { + return input; + } + return reduce(accumulatedValue, input); + } + + @Override + public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2) + { + return reduce(accumulatedValue1, accumulatedValue2); + } + + @Override + public INPUT getOutput(INPUT accumulatedValue) + { + return accumulatedValue; + } + + @Override + public INPUT getRetraction(INPUT value) + { + return null; + } + + public abstract INPUT reduce(INPUT input1, INPUT input2); + + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java new file mode 100644 index 0000000..b7cd770 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * RemoveDuplicates Accumulation. + * @param <T> + */ +public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>> +{ + @Override + public Set<T> defaultAccumulatedValue() + { + return new HashSet<>(); + } + + @Override + public Set<T> accumulate(Set<T> accumulatedValue, T input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public Set<T> merge(Set<T> accumulatedValue1, Set<T> accumulatedValue2) + { + for (T item : accumulatedValue2) { + accumulatedValue1.add(item); + } + return accumulatedValue1; + } + + @Override + public List<T> getOutput(Set<T> accumulatedValue) + { + if (accumulatedValue == null) { + return new ArrayList<>(); + } else { + return new ArrayList<>(accumulatedValue); + } + } + + @Override + public List<T> getRetraction(List<T> value) + { + // TODO: Need to add implementation for retraction. + return new ArrayList<>(value); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java new file mode 100644 index 0000000..60b195b --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang.mutable.MutableDouble; + +/** + * Sum Accumulation for doubles. + */ +public class SumDouble implements Accumulation<Double, MutableDouble, Double> +{ + @Override + public MutableDouble defaultAccumulatedValue() + { + return new MutableDouble(0.0); + } + + @Override + public MutableDouble accumulate(MutableDouble accumulatedValue, Double input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public MutableDouble merge(MutableDouble accumulatedValue1, MutableDouble accumulatedValue2) + { + accumulatedValue1.add(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public Double getOutput(MutableDouble accumulatedValue) + { + return accumulatedValue.doubleValue(); + } + + @Override + public Double getRetraction(Double value) + { + return -value; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java new file mode 100644 index 0000000..14e69e2 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang.mutable.MutableFloat; + +/** + * Sum Accumulation for floats. + */ +public class SumFloat implements Accumulation<Float, MutableFloat, Float> +{ + @Override + public MutableFloat defaultAccumulatedValue() + { + return new MutableFloat(0.); + } + + @Override + public MutableFloat accumulate(MutableFloat accumulatedValue, Float input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public MutableFloat merge(MutableFloat accumulatedValue1, MutableFloat accumulatedValue2) + { + accumulatedValue1.add(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public Float getOutput(MutableFloat accumulatedValue) + { + return accumulatedValue.floatValue(); + } + + @Override + public Float getRetraction(Float value) + { + return -value; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java new file mode 100644 index 0000000..886a7d0 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang.mutable.MutableInt; + +/** + * Sum accumulation for integers. + */ +public class SumInt implements Accumulation<Integer, MutableInt, Integer> +{ + @Override + public MutableInt defaultAccumulatedValue() + { + return new MutableInt(0); + } + + @Override + public MutableInt accumulate(MutableInt accumulatedValue, Integer input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public MutableInt merge(MutableInt accumulatedValue1, MutableInt accumulatedValue2) + { + accumulatedValue1.add(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public Integer getOutput(MutableInt accumulatedValue) + { + return accumulatedValue.intValue(); + } + + @Override + public Integer getRetraction(Integer value) + { + return -value; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java new file mode 100644 index 0000000..469eef9 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang.mutable.MutableLong; + +/** + * Sum accumulation for longs. + */ +public class SumLong implements Accumulation<Long, MutableLong, Long> +{ + @Override + public MutableLong defaultAccumulatedValue() + { + return new MutableLong(0L); + } + + @Override + public MutableLong accumulate(MutableLong accumulatedValue, Long input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2) + { + accumulatedValue1.add(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public Long getOutput(MutableLong accumulatedValue) + { + return accumulatedValue.longValue(); + } + + @Override + public Long getRetraction(Long value) + { + return -value; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java new file mode 100644 index 0000000..7dad8cc --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * TopN accumulation + */ +public class TopN<T> implements Accumulation<T, List<T>, List<T>> +{ + int n; + + Comparator<T> comparator; + + public void setN(int n) + { + this.n = n; + } + + public void setComparator(Comparator<T> comparator) + { + this.comparator = comparator; + } + + @Override + public List<T> defaultAccumulatedValue() + { + return new LinkedList<>(); + } + + @Override + public List<T> accumulate(List<T> accumulatedValue, T input) + { + int k = 0; + for (T inMemory : accumulatedValue) { + if (comparator != null) { + if (comparator.compare(inMemory, input) < 0) { + break; + } + } else if (input instanceof Comparable) { + if (((Comparable<T>)input).compareTo(inMemory) > 0) { + break; + } + } else { + throw new RuntimeException("Tuple cannot be compared"); + } + k++; + } + accumulatedValue.add(k, input); + if (accumulatedValue.size() > n) { + accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1)); + } + return accumulatedValue; + } + + @Override + public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2) + { + accumulatedValue1.addAll(accumulatedValue2); + if (comparator != null) { + Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator)); + } else { + Collections.sort(accumulatedValue1, Collections.reverseOrder()); + } + if (accumulatedValue1.size() > n) { + return accumulatedValue1.subList(0, n); + } else { + return accumulatedValue1; + } + } + + @Override + public List<T> getOutput(List<T> accumulatedValue) + { + return accumulatedValue; + } + + @Override + public List<T> getRetraction(List<T> accumulatedValue) + { + return new LinkedList<>(); + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java new file mode 100644 index 0000000..d9f9cfd --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.apex.malhar.lib.window.Accumulation; +import com.datatorrent.lib.util.KeyValPair; + +/** + * Generalized TopNByKey accumulation + */ +public class TopNByKey<K, V> implements + Accumulation<KeyValPair<K, V>, Map<K, V>, List<KeyValPair<K, V>>> +{ + int n = 10; + + Comparator<V> comparator; + + public void setN(int n) + { + this.n = n; + } + + public void setComparator(Comparator<V> comparator) + { + this.comparator = comparator; + } + + @Override + public Map<K, V> defaultAccumulatedValue() + { + return new HashMap<>(); + } + + @Override + public Map<K, V> accumulate(Map<K, V> accumulatedValue, KeyValPair<K, V> input) + { + accumulatedValue.put(input.getKey(), input.getValue()); + return accumulatedValue; + } + + @Override + public Map<K, V> merge(Map<K, V> accumulatedValue1, Map<K, V> accumulatedValue2) + { + for (Map.Entry<K, V> entry : accumulatedValue2.entrySet()) { + if (!accumulatedValue1.containsKey(entry.getKey())) { + accumulatedValue1.put(entry.getKey(), entry.getValue()); + } else if (comparator != null) { + if (comparator.compare(entry.getValue(), accumulatedValue1.get(entry.getKey())) > 0) { + accumulatedValue1.put(entry.getKey(), entry.getValue()); + } + } else if (entry.getValue() instanceof Comparable) { + if (((Comparable<V>)entry.getValue()).compareTo(accumulatedValue1.get(entry.getKey())) > 0) { + accumulatedValue1.put(entry.getKey(), entry.getValue()); + } + } + } + return accumulatedValue1; + } + + @Override + public List<KeyValPair<K, V>> getOutput(Map<K, V> accumulatedValue) + { + LinkedList<KeyValPair<K, V>> result = new LinkedList<>(); + for (Map.Entry<K, V> entry : accumulatedValue.entrySet()) { + int k = 0; + for (KeyValPair<K, V> inMemory : result) { + if (comparator != null) { + if (comparator.compare(entry.getValue(), inMemory.getValue()) > 0) { + break; + } + } else if (entry.getValue() instanceof Comparable) { + if (((Comparable<V>)entry.getValue()).compareTo(inMemory.getValue()) > 0) { + break; + } + } + k++; + } + result.add(k, new KeyValPair<K, V>(entry.getKey(), entry.getValue())); + if (result.size() > n) { + result.remove(result.get(result.size() - 1)); + } + } + return result; + } + + @Override + public List<KeyValPair<K, V>> getRetraction(List<KeyValPair<K, V>> value) + { + // TODO: Need to add implementation for retraction. + return new LinkedList<>(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java new file mode 100644 index 0000000..fb4de3c --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.commons.lang3.tuple.MutablePair; + +/** + * Test for {@link Average}. + */ +public class AverageTest +{ + @Test + public void AverageTest() + { + Average ave = new Average(); + MutablePair<Double, Long> accu = ave.defaultAccumulatedValue(); + + for (int i = 1; i <= 10; i++) { + accu = ave.accumulate(accu, (double)i); + } + Assert.assertTrue(5.5 == accu.getLeft()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java new file mode 100644 index 0000000..4e6f8f1 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.apex.malhar.lib.window.Tuple; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +/** + * Test for {@link ReduceFn}. + */ +public class FoldFnTest +{ + public static class NumGen extends BaseOperator implements InputOperator + { + public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>(); + + public static int count = 0; + private int i = 0; + + public NumGen() + { + count = 0; + i = 0; + } + + @Override + public void emitTuples() + { + while (i <= 7) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + // Ignore it. + } + count++; + if (i >= 0) { + output.emit(i++); + } + } + i = -1; + } + } + + public static class Collector extends BaseOperator + { + private static int result; + + public transient DefaultInputPort<Tuple.WindowedTuple<Integer>> input = new DefaultInputPort<Tuple.WindowedTuple<Integer>>() + { + @Override + public void process(Tuple.WindowedTuple<Integer> tuple) + { + result = tuple.getValue(); + } + }; + + public int getResult() + { + return result; + } + } + + public static class Plus extends FoldFn<Integer, Integer> + { + @Override + public Integer merge(Integer accumulatedValue1, Integer accumulatedValue2) + { + return fold(accumulatedValue1, accumulatedValue2); + } + + @Override + public Integer fold(Integer input1, Integer input2) + { + if (input1 == null) { + return input2; + } + return input1 + input2; + } + } + + @Test + public void FoldFnTest() + { + + FoldFn<String, String> concat = new FoldFn<String, String>() + { + @Override + public String merge(String accumulatedValue1, String accumulatedValue2) + { + return fold(accumulatedValue1, accumulatedValue2); + } + + @Override + public String fold(String input1, String input2) + { + return input1 + ", " + input2; + } + }; + + String[] ss = new String[]{"b", "c", "d", "e"}; + String base = "a"; + + for (String s : ss) { + base = concat.accumulate(base, s); + } + Assert.assertEquals("a, b, c, d, e", base); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java new file mode 100644 index 0000000..a9aac77 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link Group}. + */ +public class GroupTest +{ + @Test + public void GroupTest() + { + Group<Integer> group = new Group<>(); + + List<Integer> accu = group.defaultAccumulatedValue(); + Assert.assertEquals(0, accu.size()); + Assert.assertEquals(1, group.accumulate(accu, 10).size()); + Assert.assertEquals(2, group.accumulate(accu, 11).size()); + Assert.assertEquals(3, group.accumulate(accu, 11).size()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java new file mode 100644 index 0000000..c873125 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import java.util.Comparator; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for Max accumulation + */ +public class MaxTest +{ + @Test + public void MaxTest() + { + Max<Integer> max = new Max<>(); + + Assert.assertEquals((Integer)5, max.accumulate(5, 3)); + Assert.assertEquals((Integer)6, max.accumulate(4, 6)); + Assert.assertEquals((Integer)5, max.merge(5, 2)); + + Comparator<Integer> com = new Comparator<Integer>() + { + @Override + public int compare(Integer o1, Integer o2) + { + return -(o1.compareTo(o2)); + } + }; + + max.setComparator(com); + Assert.assertEquals((Integer)3, max.accumulate(5, 3)); + Assert.assertEquals((Integer)4, max.accumulate(4, 6)); + Assert.assertEquals((Integer)2, max.merge(5, 2)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java new file mode 100644 index 0000000..74816b0 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import java.util.Comparator; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link Min}. + */ +public class MinTest +{ + @Test + public void MinTest() + { + Min<Integer> min = new Min<>(); + + Assert.assertEquals((Integer)3, min.accumulate(5, 3)); + Assert.assertEquals((Integer)4, min.accumulate(4, 6)); + Assert.assertEquals((Integer)2, min.merge(5, 2)); + + Comparator<Integer> com = new Comparator<Integer>() + { + @Override + public int compare(Integer o1, Integer o2) + { + return -(o1.compareTo(o2)); + } + }; + + min.setComparator(com); + Assert.assertEquals((Integer)5, min.accumulate(5, 3)); + Assert.assertEquals((Integer)6, min.accumulate(4, 6)); + Assert.assertEquals((Integer)5, min.merge(5, 2)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java new file mode 100644 index 0000000..6b5bbad --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link ReduceFn}. + */ +public class ReduceFnTest +{ + + @Test + public void ReduceFnTest() + { + ReduceFn<String> concat = new ReduceFn<String>() + { + @Override + public String reduce(String input1, String input2) + { + return input1 + ", " + input2; + } + }; + + String[] ss = new String[]{"b", "c", "d", "e"}; + String base = "a"; + + for (String s : ss) { + base = concat.accumulate(base, s); + } + Assert.assertEquals("a, b, c, d, e", base); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java new file mode 100644 index 0000000..f0196d2 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for {@link RemoveDuplicates}. + */ +public class RemoveDuplicatesTest +{ + @Test + public void RemoveDuplicatesTest() + { + RemoveDuplicates<Integer> rd = new RemoveDuplicates<>(); + + Set<Integer> accu = rd.defaultAccumulatedValue(); + Assert.assertEquals(0, accu.size()); + Assert.assertEquals(1, rd.accumulate(accu, 10).size()); + Assert.assertEquals(2, rd.accumulate(accu, 11).size()); + Assert.assertEquals(2, rd.accumulate(accu, 11).size()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java new file mode 100644 index 0000000..65b6480 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.commons.lang.mutable.MutableDouble; +import org.apache.commons.lang.mutable.MutableFloat; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.lang.mutable.MutableLong; + +/** + * Test for different Sum Accumulations. + */ +public class SumTest +{ + @Test + public void SumTest() + { + SumInt si = new SumInt(); + SumLong sl = new SumLong(); + SumFloat sf = new SumFloat(); + SumDouble sd = new SumDouble(); + + Assert.assertEquals(new MutableInt(10), si.accumulate(si.defaultAccumulatedValue(), 10)); + Assert.assertEquals(new MutableInt(11), si.accumulate(new MutableInt(1), 10)); + Assert.assertEquals(new MutableInt(22), si.merge(new MutableInt(1), new MutableInt(21))); + + Assert.assertEquals(new MutableLong(10L), sl.accumulate(sl.defaultAccumulatedValue(), 10L)); + Assert.assertEquals(new MutableLong(22L), sl.accumulate(new MutableLong(2L), 20L)); + Assert.assertEquals(new MutableLong(41L), sl.merge(new MutableLong(32L), new MutableLong(9L))); + + Assert.assertEquals(new MutableFloat(9.0F), sf.accumulate(sf.defaultAccumulatedValue(), 9.0F)); + Assert.assertEquals(new MutableFloat(22.5F), sf.accumulate(new MutableFloat(2.5F), 20F)); + Assert.assertEquals(new MutableFloat(41.0F), sf.merge(new MutableFloat(33.1F), new MutableFloat(7.9F))); + + Assert.assertEquals(new MutableDouble(9.0), sd.accumulate(sd.defaultAccumulatedValue(), 9.0)); + Assert.assertEquals(new MutableDouble(22.5), sd.accumulate(new MutableDouble(2.5), 20.0)); + Assert.assertEquals(new MutableDouble(41.0), sd.merge(new MutableDouble(33.1), new MutableDouble(7.9))); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java new file mode 100644 index 0000000..3f6ac09 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl.accumulation; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * Unit test for TopNByKey accumulation + */ +public class TopNByKeyTest +{ + @Test + public void TopNByKeyTest() throws Exception + { + TopNByKey<String, Integer> topNByKey = new TopNByKey<>(); + topNByKey.setN(3); + Map<String, Integer> accu = topNByKey.defaultAccumulatedValue(); + + Assert.assertEquals(0, accu.size()); + + accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("1", 1)); + accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("3", 3)); + + List<KeyValPair<String, Integer>> result1 = new ArrayList<>(); + + result1.add(new KeyValPair<String, Integer>("3", 3)); + result1.add(new KeyValPair<String, Integer>("1", 1)); + + Assert.assertEquals(result1, topNByKey.getOutput(accu)); + + accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("2", 2)); + + List<KeyValPair<String, Integer>> result2 = new ArrayList<>(); + + result2.add(new KeyValPair<String, Integer>("3", 3)); + result2.add(new KeyValPair<String, Integer>("2", 2)); + result2.add(new KeyValPair<String, Integer>("1", 1)); + + Assert.assertEquals(result2, topNByKey.getOutput(accu)); + + accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("5", 5)); + accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("4", 4)); + + List<KeyValPair<String, Integer>> result3 = new ArrayList<>(); + + result3.add(new KeyValPair<String, Integer>("5", 5)); + result3.add(new KeyValPair<String, Integer>("4", 4)); + result3.add(new KeyValPair<String, Integer>("3", 3)); + + Assert.assertEquals(result3, topNByKey.getOutput(accu)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java index bc99035..84f05fc 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java @@ -27,9 +27,9 @@ import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl; +import org.apache.apex.malhar.lib.window.impl.accumulation.FoldFn; +import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn; import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.accumulation.FoldFn; -import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn; import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.lib.util.KeyValPair; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java index a293ea8..ebd5eea 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java @@ -32,15 +32,16 @@ import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl; import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl; +import org.apache.apex.malhar.lib.window.impl.accumulation.Count; +import org.apache.apex.malhar.lib.window.impl.accumulation.FoldFn; +import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn; +import org.apache.apex.malhar.lib.window.impl.accumulation.TopN; + import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.Option; import org.apache.apex.malhar.stream.api.WindowedStream; import org.apache.apex.malhar.stream.api.function.Function; -import org.apache.apex.malhar.stream.api.impl.accumulation.Count; -import org.apache.apex.malhar.stream.api.impl.accumulation.FoldFn; -import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn; -import org.apache.apex.malhar.stream.api.impl.accumulation.TopN; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.hadoop.classification.InterfaceStability; @@ -62,7 +63,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind protected Duration allowedLateness; - private class ConvertFn<T> implements Function.MapFunction<T, Tuple<T>> + private static class ConvertFn<T> implements Function.MapFunction<T, Tuple<T>> { @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java deleted file mode 100644 index 68f1b9e..0000000 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.api.impl.accumulation; - -import org.apache.apex.malhar.lib.window.Accumulation; -import org.apache.commons.lang3.mutable.MutableLong; - -/** - * Count Accumulation - */ -public class Count implements Accumulation<Long, MutableLong, Long> -{ - - @Override - public MutableLong defaultAccumulatedValue() - { - return new MutableLong(0); - } - - @Override - public MutableLong accumulate(MutableLong accumulatedValue, Long input) - { - accumulatedValue.add(input); - return accumulatedValue; - } - - @Override - public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2) - { - accumulatedValue1.add(accumulatedValue2); - return accumulatedValue1; - } - - @Override - public Long getOutput(MutableLong accumulatedValue) - { - return accumulatedValue.getValue(); - } - - @Override - public Long getRetraction(Long value) - { - return -value; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java deleted file mode 100644 index 3ab6892..0000000 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.api.impl.accumulation; - -import org.apache.apex.malhar.lib.window.Accumulation; - -/** - * Fold Accumulation Adaptor class - */ -public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT> -{ - - public FoldFn() - { - } - - public FoldFn(OUTPUT initialVal) - { - this.initialVal = initialVal; - } - - private OUTPUT initialVal; - - @Override - public OUTPUT defaultAccumulatedValue() - { - return initialVal; - } - - @Override - public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input) - { - return fold(accumulatedValue, input); - } - - @Override - public OUTPUT getOutput(OUTPUT accumulatedValue) - { - return accumulatedValue; - } - - @Override - public OUTPUT getRetraction(OUTPUT value) - { - return null; - } - - abstract OUTPUT fold(OUTPUT result, INPUT input); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java deleted file mode 100644 index b4507bc..0000000 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.api.impl.accumulation; - -import org.apache.apex.malhar.lib.window.Accumulation; - -/** - * An easy to use reduce Accumulation - * @param <INPUT> - */ -public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT> -{ - @Override - public INPUT defaultAccumulatedValue() - { - return null; - } - - @Override - public INPUT accumulate(INPUT accumulatedValue, INPUT input) - { - if (accumulatedValue == null) { - return input; - } - return reduce(accumulatedValue, input); - } - - @Override - public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2) - { - return reduce(accumulatedValue1, accumulatedValue2); - } - - @Override - public INPUT getOutput(INPUT accumulatedValue) - { - return accumulatedValue; - } - - @Override - public INPUT getRetraction(INPUT value) - { - return null; - } - - public abstract INPUT reduce(INPUT input1, INPUT input2); - - -}
