Repository: apex-malhar Updated Branches: refs/heads/master f2bc30fce -> 611701c3d
APEXMALHAR-2453 Added Sort Accumulation for Windowed operators Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/611701c3 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/611701c3 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/611701c3 Branch: refs/heads/master Commit: 611701c3df6bf95148fc9e3a6445efeee26581f4 Parents: f2bc30f Author: ajaygit158 <[email protected]> Authored: Tue Mar 21 17:20:19 2017 +0530 Committer: ajaygit158 <[email protected]> Committed: Wed Mar 29 12:09:04 2017 +0530 ---------------------------------------------------------------------- .../malhar/lib/window/accumulation/Sort.java | 106 ++++++++++++ .../lib/window/accumulation/SortTest.java | 160 +++++++++++++++++++ 2 files changed, 266 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/611701c3/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Sort.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Sort.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Sort.java new file mode 100644 index 0000000..7dc684f --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Sort.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Sort accumulation + */ [email protected] +public class Sort<T> implements Accumulation<T, List<T>, List<T>> +{ + boolean reverseSort; + Comparator<T> comparator; + Comparator<T> reverseComparator; + + public Sort() + { + //for kryo + } + + /** + * @param reverseSort + * sort in order opposite to how the comparator would sort + * @param comparator + * comparator to sort the tuples + */ + public Sort(final boolean reverseSort, final Comparator<T> comparator) + { + this.reverseSort = reverseSort; + this.comparator = comparator; + this.reverseComparator = Collections.reverseOrder(comparator); + } + + @Override + public List<T> defaultAccumulatedValue() + { + return new ArrayList<T>(); + } + + @Override + public List<T> accumulate(List<T> accumulatedValue, T input) + { + if (comparator == null) { + throw new RuntimeException("Comparator not provided, Tuple cannot be compared"); + } + Comparator<T> accComparator = reverseSort ? reverseComparator : comparator; + insertElement(accumulatedValue, input, accComparator); + return accumulatedValue; + } + + @Override + public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2) + { + if (comparator == null) { + throw new RuntimeException("Comparator not provided, Tuple cannot be compared"); + } + Comparator<T> accComparator = reverseSort ? reverseComparator : comparator; + for (T t : accumulatedValue2) { + insertElement(accumulatedValue1, t, accComparator); + } + return accumulatedValue1; + } + + private void insertElement(List<T> accumulatedValue, T element, Comparator<T> comparator) + { + //binarySearch returns location if input exists else returns (-(insertion point) - 1) + int index = Collections.binarySearch(accumulatedValue, element, comparator); + index = index >= 0 ? index : (-index - 1); + accumulatedValue.add(index, element); + } + + @Override + public List<T> getOutput(List<T> accumulatedValue) + { + return accumulatedValue; + } + + @Override + public List<T> getRetraction(List<T> accumulatedValue) + { + return new ArrayList<T>(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/611701c3/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SortTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SortTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SortTest.java new file mode 100644 index 0000000..daeb4fc --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SortTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for Sort accumulation + */ +public class SortTest +{ + public static class TestPojo1 + { + private int uId; + private String uName; + + public TestPojo1() + { + + } + + public TestPojo1(int id, String name) + { + this.uId = id; + this.uName = name; + } + + public int getUId() + { + return uId; + } + + public void setUId(int uId) + { + this.uId = uId; + } + + public String getUName() + { + return uName; + } + + public void setUName(String uName) + { + this.uName = uName; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + TestPojo1 other = (TestPojo1)obj; + if (uId != other.uId) { + return false; + } + if (uName == null) { + if (other.uName != null) { + return false; + } + } else if (!uName.equals(other.uName)) { + return false; + } + return true; + } + } + + @Test + public void SortTestAscDesc() + { + Comparator<TestPojo1> comparator = new Comparator<TestPojo1>() + { + + @Override + public int compare(TestPojo1 o1, TestPojo1 o2) + { + if (o1 == null && o2 == null) { + return 0; + } else if (o1 == null) { + return -1; + } else if (o2 == null) { + return 1; + } else if (o1.getUId() != o2.getUId()) { + return o1.getUId() - o2.getUId(); + } else { + return o1.getUName().compareTo(o2.getUName()); + } + } + }; + Sort<TestPojo1> sort = new Sort<>(false, comparator); + TestPojo1 o1 = new TestPojo1(5, "user1"); + TestPojo1 o2 = new TestPojo1(15, "user32"); + TestPojo1 o3 = new TestPojo1(5, "user11"); + TestPojo1 o4 = new TestPojo1(2, "user12"); + TestPojo1 o5 = new TestPojo1(15, "user32"); + List<TestPojo1> ascList = new ArrayList<>(); + ascList.add(o4); + ascList.add(o1); + ascList.add(o3); + ascList.add(o2); + ascList.add(o5); + List<TestPojo1> accumulatedValue = sort.defaultAccumulatedValue(); + accumulatedValue = sort.accumulate(accumulatedValue, o1); + accumulatedValue = sort.accumulate(accumulatedValue, o2); + accumulatedValue = sort.accumulate(accumulatedValue, o3); + accumulatedValue = sort.accumulate(accumulatedValue, o4); + accumulatedValue = sort.accumulate(accumulatedValue, o5); + + Iterator<TestPojo1> it = accumulatedValue.iterator(); + int i = 0; + while (it.hasNext()) { + Assert.assertEquals(ascList.get(i), it.next()); + i++; + } + + sort = new Sort<>(true, comparator); + List<TestPojo1> descAccumulatedValue = sort.defaultAccumulatedValue(); + descAccumulatedValue = sort.accumulate(descAccumulatedValue, o1); + descAccumulatedValue = sort.accumulate(descAccumulatedValue, o2); + descAccumulatedValue = sort.accumulate(descAccumulatedValue, o3); + descAccumulatedValue = sort.accumulate(descAccumulatedValue, o4); + descAccumulatedValue = sort.accumulate(descAccumulatedValue, o5); + + it = descAccumulatedValue.iterator(); + i = ascList.size() - 1; + while (it.hasNext()) { + Assert.assertEquals(ascList.get(i), it.next()); + i--; + } + } +}
