Repository: kylin Updated Branches: refs/heads/2.x-staging 5f4c581e6 -> f4667226d
http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoublyLinkedList.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoublyLinkedList.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoublyLinkedList.java new file mode 100644 index 0000000..03e4066 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoublyLinkedList.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.topn; + + +/** + * Modified from DoublyLinkedList.java in https://github.com/addthis/stream-lib + * + * @param <T> + */ +public class DoublyLinkedList<T> { + + private int size = 0; + private ListNode2<T> tail; + private ListNode2<T> head; + + /** + * Append to head of list + */ + public ListNode2<T> add(T value) { + ListNode2<T> node = new ListNode2<T>(value); + add(node); + + return node; + } + + /** + * Prepend to tail of list + */ + public ListNode2<T> enqueue(T value) { + ListNode2<T> node = new ListNode2<T>(value); + + return enqueue(node); + } + + public ListNode2<T> enqueue(ListNode2<T> node) { + if (size++ == 0) { + head = node; + } else { + node.next = tail; + tail.prev = node; + } + + tail = node; + + return node; + } + + public void add(ListNode2<T> node) { + node.prev = head; + node.next = null; + + if (size++ == 0) { + tail = node; + } else { + head.next = node; + } + + head = node; + } + + public ListNode2<T> addAfter(ListNode2<T> node, T value) { + ListNode2<T> newNode = new ListNode2<T>(value); + addAfter(node, newNode); + return newNode; + } + + public void addAfter(ListNode2<T> node, ListNode2<T> newNode) { + newNode.next = node.next; + newNode.prev = node; + node.next = newNode; + if (newNode.next == null) { + head = newNode; + } else { + newNode.next.prev = newNode; + } + size++; + } + + + public void addBefore(ListNode2<T> node, ListNode2<T> newNode) { + newNode.prev = node.prev; + newNode.next = node; + node.prev = newNode; + if (newNode.prev == null) { + tail = newNode; + } else { + newNode.prev.next = newNode; + } + size++; + } + + public void remove(ListNode2<T> node) { + if (node == tail) { + tail = node.next; + } else { + node.prev.next = node.next; + } + + if (node == head) { + head = node.prev; + } else { + node.next.prev = node.prev; + } + size--; + } + + public int size() { + return size; + } + + + public ListNode2<T> head() { + return head; + } + + public ListNode2<T> tail() { + return tail; + } + + public boolean isEmpty() { + return size == 0; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/main/java/org/apache/kylin/measure/topn/ListNode2.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/ListNode2.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/ListNode2.java new file mode 100644 index 0000000..b2f47c9 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/ListNode2.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.topn; + +/** + * Modified from ListNode2.java in https://github.com/addthis/stream-lib + * + * @param <T> + */ +public class ListNode2<T> { + + protected T value; + protected ListNode2<T> prev; + protected ListNode2<T> next; + + public ListNode2(T value) { + this.value = value; + } + + public ListNode2<T> getPrev() { + return prev; + } + + public ListNode2<T> getNext() { + return next; + } + + public T getValue() { + return value; + } + + public void setValue(T value) { + this.value = value; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java index 9b4c893..6ea4e7a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java @@ -20,7 +20,6 @@ package org.apache.kylin.measure.topn; import java.util.Map; -import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.measure.MeasureAggregator; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java new file mode 100644 index 0000000..fd35309 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.topn; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.kylin.common.util.Pair; + +import java.util.*; + +/** + * Modified from the StreamSummary.java in https://github.com/addthis/stream-lib + * + * Based on the <i>Space-Saving</i> algorithm and the <i>Stream-Summary</i> + * data structure as described in: + * <i>Efficient Computation of Frequent and Top-k Elements in Data Streams</i> + * by Metwally, Agrawal, and Abbadi + * + * @param <T> type of data in the stream to be summarized + */ +public class TopNCounter<T> implements Iterable<Counter<T>> { + + public static final int EXTRA_SPACE_RATE = 50; + + protected int capacity; + private HashMap<T, ListNode2<Counter<T>>> counterMap; + protected DoublyLinkedList<Counter<T>> counterList; + + /** + * @param capacity maximum size (larger capacities improve accuracy) + */ + public TopNCounter(int capacity) { + this.capacity = capacity; + counterMap = new HashMap<T, ListNode2<Counter<T>>>(); + counterList = new DoublyLinkedList<Counter<T>>(); + } + + public int getCapacity() { + return capacity; + } + + /** + * Algorithm: <i>Space-Saving</i> + * + * @param item stream element (<i>e</i>) + * @return false if item was already in the stream summary, true otherwise + */ + public boolean offer(T item) { + return offer(item, 1.0); + } + + /** + * Algorithm: <i>Space-Saving</i> + * + * @param item stream element (<i>e</i>) + * @return false if item was already in the stream summary, true otherwise + */ + public boolean offer(T item, double incrementCount) { + return offerReturnAll(item, incrementCount).getFirst(); + } + + /** + * @param item stream element (<i>e</i>) + * @return item dropped from summary if an item was dropped, null otherwise + */ + public T offerReturnDropped(T item, double incrementCount) { + return offerReturnAll(item, incrementCount).getSecond(); + } + + /** + * @param item stream element (<i>e</i>) + * @return Pair<isNewItem, itemDropped> where isNewItem is the return value of offer() and itemDropped is null if no item was dropped + */ + public Pair<Boolean, T> offerReturnAll(T item, double incrementCount) { + ListNode2<Counter<T>> counterNode = counterMap.get(item); + boolean isNewItem = (counterNode == null); + T droppedItem = null; + if (isNewItem) { + + if (size() < capacity) { + counterNode = counterList.enqueue(new Counter<T>(item)); + } else { + counterNode = counterList.tail(); + Counter<T> counter = counterNode.getValue(); + droppedItem = counter.item; + counterMap.remove(droppedItem); + counter.item = item; + counter.count = 0.0; + } + counterMap.put(item, counterNode); + } + + incrementCounter(counterNode, incrementCount); + + return Pair.newPair(isNewItem, droppedItem); + } + + protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) { + Counter<T> counter = counterNode.getValue(); + counter.count += incrementCount; + + ListNode2<Counter<T>> nodeNext; + + if (incrementCount > 0) { + nodeNext = counterNode.getNext(); + } else { + nodeNext = counterNode.getPrev(); + } + counterList.remove(counterNode); + counterNode.prev = null; + counterNode.next = null; + + if (incrementCount > 0) { + while (nodeNext != null && counter.count >= nodeNext.getValue().count) { + nodeNext = nodeNext.getNext(); + } + if (nodeNext != null) { + counterList.addBefore(nodeNext, counterNode); + } else { + counterList.add(counterNode); + } + + } else { + while (nodeNext != null && counter.count < nodeNext.getValue().count) { + nodeNext = nodeNext.getPrev(); + } + if (nodeNext != null) { + counterList.addAfter(nodeNext, counterNode); + } else { + counterList.enqueue(counterNode); + } + } + + + + } + + public List<T> peek(int k) { + List<T> topK = new ArrayList<T>(k); + + for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) { + Counter<T> b = bNode.getValue(); + if (topK.size() == k) { + return topK; + } + topK.add(b.item); + } + + return topK; + } + + public List<Counter<T>> topK(int k) { + List<Counter<T>> topK = new ArrayList<Counter<T>>(k); + + for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) { + Counter<T> b = bNode.getValue(); + if (topK.size() == k) { + return topK; + } + topK.add(b); + } + + return topK; + } + + /** + * @return number of items stored + */ + public int size() { + return counterMap.size(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append('['); + for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; bNode = bNode.getPrev()) { + Counter<T> b = bNode.getValue(); + sb.append(b.item); + sb.append(':'); + sb.append(b.count); + } + sb.append(']'); + return sb.toString(); + } + + /** + * Put element to the head position; + * The consumer should call this method with count in ascending way; the item will be directly put to the head of the list, without comparison for best performance; + * @param item + * @param count + */ + public void offerToHead(T item, double count) { + Counter<T> c = new Counter<T>(item); + c.count = count; + ListNode2<Counter<T>> node = counterList.add(c); + counterMap.put(c.item, node); + } + + /** + * Merge another counter into this counter; + * @param another + * @return + */ + public TopNCounter<T> merge(TopNCounter<T> another) { + double m1 = 0.0, m2 = 0.0; + if (this.size() >= this.capacity) { + m1 = this.counterList.tail().getValue().count; + } + + if (another.size() >= another.capacity) { + m2 = another.counterList.tail().getValue().count; + } + + Set<T> duplicateItems = Sets.newHashSet(); + List<T> notDuplicateItems = Lists.newArrayList(); + + for (Map.Entry<T, ListNode2<Counter<T>>> entry : this.counterMap.entrySet()) { + T item = entry.getKey(); + ListNode2<Counter<T>> existing = another.counterMap.get(item); + if (existing != null) { + duplicateItems.add(item); + } else { + notDuplicateItems.add(item); + } + } + + for(T item : duplicateItems) { + this.offer(item, another.counterMap.get(item).getValue().count); + } + + for(T item : notDuplicateItems) { + this.offer(item, m2); + } + + for (Map.Entry<T, ListNode2<Counter<T>>> entry : another.counterMap.entrySet()) { + T item = entry.getKey(); + if (duplicateItems.contains(item) == false) { + double counter = entry.getValue().getValue().count; + this.offer(item, counter + m1); + } + } + + return this; + } + + /** + * Retain the capacity to the given number; The extra counters will be cut off + * @param newCapacity + */ + public void retain(int newCapacity) { + assert newCapacity > 0; + this.capacity = newCapacity; + if (newCapacity < this.size()) { + ListNode2<Counter<T>> tail = counterList.tail(); + while (tail != null && this.size() > newCapacity) { + Counter<T> bucket = tail.getValue(); + this.counterMap.remove(bucket.getItem()); + this.counterList.remove(tail); + tail = this.counterList.tail(); + } + } + + } + + /** + * Get the counter values in ascending order + * @return + */ + public double[] getCounters() { + double[] counters = new double[size()]; + int index = 0; + + for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; bNode = bNode.getNext()) { + Counter<T> b = bNode.getValue(); + counters[index] = b.count; + index++; + } + + assert index == size(); + return counters; + } + + @Override + public Iterator<Counter<T>> iterator() { + return new TopNCounterIterator(); + } + + /** + * Iterator from the tail (smallest) to head (biggest); + */ + private class TopNCounterIterator implements Iterator<Counter<T>> { + + private ListNode2<Counter<T>> currentBNode; + + private TopNCounterIterator() { + currentBNode = counterList.tail(); + } + + @Override + public boolean hasNext() { + return currentBNode != null; + + } + + @Override + public Counter<T> next() { + Counter<T> counter = currentBNode.getValue(); + currentBNode = currentBNode.getNext(); + return counter; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java index 777b47f..604365c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java @@ -22,9 +22,6 @@ import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; -import org.apache.kylin.common.topn.Counter; -import org.apache.kylin.common.topn.DoubleDeltaSerializer; -import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java index 06493f7..0f79c1d 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java @@ -24,8 +24,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.kylin.common.topn.Counter; -import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Dictionary; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java index abe11e3..5b50241 100644 --- a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java @@ -2,7 +2,7 @@ package org.apache.kylin.aggregation.topn; import java.nio.ByteBuffer; -import org.apache.kylin.common.topn.TopNCounter; +import org.apache.kylin.measure.topn.TopNCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.measure.topn.TopNCounterSerializer; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java new file mode 100644 index 0000000..e96b9d0 --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.hll; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.junit.Assert; +import org.junit.Test; + +/** + * @author yangli9 + * + */ +public class HyperLogLogCounterTest { + + ByteBuffer buf = ByteBuffer.allocate(1024 * 1024); + Random rand1 = new Random(1); + Random rand2 = new Random(2); + Random rand3 = new Random(3); + int errorCount1 = 0; + int errorCount2 = 0; + int errorCount3 = 0; + + @Test + public void testPeekLength() throws IOException { + HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter(10); + HyperLogLogPlusCounter copy = new HyperLogLogPlusCounter(10); + byte[] value = new byte[10]; + for (int i = 0; i < 200000; i++) { + rand1.nextBytes(value); + hllc.add(value); + + buf.clear(); + hllc.writeRegisters(buf); + + int len = buf.position(); + buf.position(0); + assertEquals(len, hllc.peekLength(buf)); + + copy.readRegisters(buf); + assertEquals(len, buf.position()); + assertEquals(hllc, copy); + } + buf.clear(); + } + + private Set<String> generateTestData(int n) { + Set<String> testData = new HashSet<String>(); + for (int i = 0; i < n; i++) { + String[] samples = generateSampleData(); + for (String sample : samples) { + testData.add(sample); + } + } + return testData; + } + + // simulate the visit (=visitor+id) + private String[] generateSampleData() { + + StringBuilder buf = new StringBuilder(); + for (int i = 0; i < 19; i++) { + buf.append(Math.abs(rand1.nextInt()) % 10); + } + String header = buf.toString(); + + int size = Math.abs(rand3.nextInt()) % 9 + 1; + String[] samples = new String[size]; + for (int k = 0; k < size; k++) { + buf = new StringBuilder(header); + buf.append("-"); + for (int i = 0; i < 10; i++) { + buf.append(Math.abs(rand3.nextInt()) % 10); + } + samples[k] = buf.toString(); + } + + return samples; + } + + @Test + public void countTest() throws IOException { + int n = 10; + for (int i = 0; i < 5; i++) { + count(n); + n *= 10; + } + } + + private void count(int n) throws IOException { + Set<String> testSet = generateTestData(n); + + HyperLogLogPlusCounter hllc = newHLLC(); + for (String testData : testSet) { + hllc.add(Bytes.toBytes(testData)); + } + long estimate = hllc.getCountEstimate(); + double errorRate = hllc.getErrorRate(); + double actualError = (double) Math.abs(testSet.size() - estimate) / testSet.size(); + System.out.println(estimate); + System.out.println(testSet.size()); + System.out.println(errorRate); + System.out.println("=" + actualError); + Assert.assertTrue(actualError < errorRate * 3.0); + + checkSerialize(hllc); + } + + private void checkSerialize(HyperLogLogPlusCounter hllc) throws IOException { + long estimate = hllc.getCountEstimate(); + buf.clear(); + hllc.writeRegisters(buf); + buf.flip(); + hllc.readRegisters(buf); + Assert.assertEquals(estimate, hllc.getCountEstimate()); + } + + @Test + public void mergeTest() throws IOException { + double error = 0; + double absError = 0; + int n = 100; + for (int i = 0; i < n; i++) { + double e = merge(); + error += e; + absError += Math.abs(e); + } + System.out.println("Total average error is " + error / n + " and absolute error is " + absError / n); + + System.out.println(" errorRateCount1 is " + errorCount1 + "!"); + System.out.println(" errorRateCount2 is " + errorCount2 + "!"); + System.out.println(" errorRateCount3 is " + errorCount3 + "!"); + + Assert.assertTrue(errorCount1 <= n * 0.40); + Assert.assertTrue(errorCount2 <= n * 0.08); + Assert.assertTrue(errorCount3 <= n * 0.02); + } + + private double merge() throws IOException { + + int ln = 50; + int dn = 300; + Set<String> testSet = new HashSet<String>(); + HyperLogLogPlusCounter[] hllcs = new HyperLogLogPlusCounter[ln]; + for (int i = 0; i < ln; i++) { + hllcs[i] = newHLLC(); + for (int k = 0; k < dn; k++) { + String[] samples = generateSampleData(); + for (String data : samples) { + testSet.add(data); + hllcs[i].add(Bytes.toBytes(data)); + } + } + } + HyperLogLogPlusCounter mergeHllc = newHLLC(); + for (HyperLogLogPlusCounter hllc : hllcs) { + mergeHllc.merge(hllc); + checkSerialize(mergeHllc); + } + + double errorRate = mergeHllc.getErrorRate(); + long estimate = mergeHllc.getCountEstimate(); + double actualError = (double) (testSet.size() - estimate) / testSet.size(); + + System.out.println(testSet.size() + "-" + estimate + " ~ " + actualError); + + if (Math.abs(actualError) > errorRate) { + errorCount1++; + } + if (Math.abs(actualError) > 2 * errorRate) { + errorCount2++; + } + if (Math.abs(actualError) > 3 * errorRate) { + errorCount3++; + } + + return actualError; + } + + @Test + public void testPerformance() throws IOException { + int N = 3; // reduce N HLLC into one + int M = 1000; // for M times, use 100000 for real perf test + + HyperLogLogPlusCounter samples[] = new HyperLogLogPlusCounter[N]; + for (int i = 0; i < N; i++) { + samples[i] = newHLLC(); + for (String str : generateTestData(10000)) + samples[i].add(str); + } + + System.out.println("Perf test running ... "); + long start = System.currentTimeMillis(); + HyperLogLogPlusCounter sum = newHLLC(); + for (int i = 0; i < M; i++) { + sum.clear(); + for (int j = 0; j < N; j++) { + sum.merge(samples[j]); + checkSerialize(sum); + } + } + long duration = System.currentTimeMillis() - start; + System.out.println("Perf test result: " + duration / 1000 + " seconds"); + } + + @Test + public void testEquivalence() { + byte[] a = new byte[] { 0, 3, 4, 42, 2, 2 }; + byte[] b = new byte[] { 3, 4, 42 }; + HyperLogLogPlusCounter ha = new HyperLogLogPlusCounter(); + HyperLogLogPlusCounter hb = new HyperLogLogPlusCounter(); + ha.add(a, 1, 3); + hb.add(b); + + Assert.assertTrue(ha.getCountEstimate() == hb.getCountEstimate()); + } + + private HyperLogLogPlusCounter newHLLC() { + return new HyperLogLogPlusCounter(16); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/test/java/org/apache/kylin/measure/topn/DoubleDeltaSerializerTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/topn/DoubleDeltaSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/topn/DoubleDeltaSerializerTest.java new file mode 100644 index 0000000..3a387f5 --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/measure/topn/DoubleDeltaSerializerTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.measure.topn; + +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; + +import org.junit.Test; + +public class DoubleDeltaSerializerTest { + + ByteBuffer buf = ByteBuffer.allocate(8192); + DoubleDeltaSerializer dds = new DoubleDeltaSerializer(); + + @Test + public void testEmpty() { + buf.clear(); + dds.serialize(new double[] {}, buf); + buf.flip(); + double[] r = dds.deserialize(buf); + assertTrue(r.length == 0); + } + + @Test + public void testSingle() { + buf.clear(); + dds.serialize(new double[] { 1.2 }, buf); + buf.flip(); + double[] r = dds.deserialize(buf); + assertArrayEquals(new double[] { 1.2 }, r); + } + + @Test + public void testRounding() { + buf.clear(); + dds.serialize(new double[] { 1.234, 2.345 }, buf); + buf.flip(); + double[] r = dds.deserialize(buf); + assertArrayEquals(new double[] { 1.23, 2.35 }, r); + } + + @Test + public void testRandom() { + Random rand = new Random(); + int n = 1000; + + double[] nums = new double[n]; + for (int i = 0; i < n; i++) { + nums[i] = rand.nextDouble() * 1000000; + } + Arrays.sort(nums); + + buf.clear(); + dds.serialize(nums, buf); + buf.flip(); + double[] r = dds.deserialize(buf); + assertArrayEquals(nums, r); + System.out.println("doubles size of " + (n * 8) + " bytes serialized to " + buf.limit() + " bytes"); + } + + private static void assertArrayEquals(double[] expected, double[] actual) { + assertEquals(expected.length, actual.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], actual[i], 0.01); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java new file mode 100644 index 0000000..a55b493 --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java @@ -0,0 +1,131 @@ +/* + * Copyright (C) 2011 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.topn; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TopNCounterBasicTest { + + @Test + public void testTopNCounter() { + TopNCounter<String> vs = new TopNCounter<String>(3); + String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "A", "A", "Y" }; + for (String i : stream) { + vs.offer(i); + } + + List<Counter<String>> topk = vs.topK(6); + + for (Counter<String> top : topk) { + System.out.println(top.getItem() + ":" + top.getCount()); + } + + } + + @Test + public void testTopK() { + TopNCounter<String> vs = new TopNCounter<String>(3); + String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" }; + for (String i : stream) { + vs.offer(i); + } + List<Counter<String>> topK = vs.topK(3); + for (Counter<String> c : topK) { + assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem())); + } + } + + @Test + public void testTopKWithIncrement() { + TopNCounter<String> vs = new TopNCounter<String>(3); + String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" }; + for (String i : stream) { + vs.offer(i, 10); + } + List<Counter<String>> topK = vs.topK(3); + for (Counter<String> c : topK) { + assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem())); + } + } + + @Test + public void testTopKWithIncrementOutOfOrder() { + TopNCounter<String> vs_increment = new TopNCounter<String>(3); + TopNCounter<String> vs_single = new TopNCounter<String>(3); + String[] stream = { "A", "B", "C", "D", "A" }; + Integer[] increments = { 15, 20, 25, 30, 1 }; + + for (int i = 0; i < stream.length; i++) { + vs_increment.offer(stream[i], increments[i]); + for (int k = 0; k < increments[i]; k++) { + vs_single.offer(stream[i]); + } + } + System.out.println("Insert with counts vs. single inserts:"); + System.out.println(vs_increment); + System.out.println(vs_single); + + List<Counter<String>> topK_increment = vs_increment.topK(3); + List<Counter<String>> topK_single = vs_single.topK(3); + + for (int i = 0; i < topK_increment.size(); i++) { + assertEquals(topK_increment.get(i).getItem(), topK_single.get(i).getItem()); + } + } + + @Test + public void testRetain() { + TopNCounter<String> vs = new TopNCounter<String>(10); + String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" }; + for (String i : stream) { + vs.offer(i); + } + + vs.retain(5); + assertTrue(vs.size() <= 5); + assertTrue(vs.getCapacity() <= 5); + } + + @Test + public void testMerge() { + + TopNCounter<String> vs = new TopNCounter<String>(10); + String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "B", "A" }; + for (String i : stream) { + vs.offer(i); + } + + String[] stream2 = { "B", "B", "Z", "Z", "B", "C", "X", "X" }; + TopNCounter<String> vs2 = new TopNCounter<String>(10); + for (String i : stream2) { + vs2.offer(i); + } + // X: 4+2, C: 2+1, A: 3+0, B: 2 +3, Y: 1+0 Z: 1 +0 + vs.merge(vs2); + List<Counter<String>> topK = vs.topK(3); + for (Counter<String> c : topK) { + assertTrue(Arrays.asList("A", "B", "X").contains(c.getItem())); + } + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterCombinationTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterCombinationTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterCombinationTest.java new file mode 100644 index 0000000..bf51112 --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterCombinationTest.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2011 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.topn; + +import org.junit.Ignore; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; + +@RunWith(Parameterized.class) +@Ignore("For collecting accuracy statistics, not for functional test") +public class TopNCounterCombinationTest extends TopNCounterTest { + + @Parameterized.Parameters + public static Collection<Integer[]> configs() { + return Arrays.asList(new Integer[][] { + // with 20X space + { 10, 20 }, // top 10% + { 20, 20 }, // top 5% + { 100, 20 }, // top 1% + { 1000, 20 }, // top 0.1% + + // with 50X space + { 10, 50 }, // top 10% + { 20, 50 }, // top 5% + { 100, 50 }, // top 1% + { 1000, 50 }, // top 0.1% + + // with 100X space + { 10, 100 }, // top 10% + { 20, 100 }, // top 5% + { 100, 100 }, // top 1% + { 1000, 100 }, // top 0.1% + }); + } + + public TopNCounterCombinationTest(int keySpaceRate, int spaceSavingRate) throws Exception { + super(); + this.TOP_K = 100; + this.KEY_SPACE = TOP_K * keySpaceRate; + this.SPACE_SAVING_ROOM = spaceSavingRate; + TOTAL_RECORDS = 1000000; // 1 million + this.PARALLEL = 10; + this.verbose = true; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterTest.java new file mode 100644 index 0000000..7b7031b --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterTest.java @@ -0,0 +1,306 @@ +/* + * Copyright (C) 2011 Clearspring Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.topn; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.math3.distribution.ZipfDistribution; +import org.apache.kylin.common.util.Pair; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.*; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +@Ignore("For collecting accuracy statistics, not for functional test") +public class TopNCounterTest { + + protected static int TOP_K; + + protected static int KEY_SPACE; + + protected static int TOTAL_RECORDS; + + protected static int SPACE_SAVING_ROOM; + + protected static int PARALLEL = 10; + + protected static boolean verbose = true; + + public TopNCounterTest() { + TOP_K = 100; + KEY_SPACE = 100 * TOP_K; + TOTAL_RECORDS = 1000000; // 1 million + SPACE_SAVING_ROOM = 100; + } + + protected String prepareTestDate() throws IOException { + String[] allKeys = new String[KEY_SPACE]; + + for (int i = 0; i < KEY_SPACE; i++) { + allKeys[i] = RandomStringUtils.randomAlphabetic(10); + } + + outputMsg("Start to create test random data..."); + long startTime = System.currentTimeMillis(); + ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE, 0.5); + int keyIndex; + + File tempFile = File.createTempFile("ZipfDistribution", ".txt"); + + if (tempFile.exists()) + FileUtils.forceDelete(tempFile); + FileWriter fw = new FileWriter(tempFile); + try { + for (int i = 0; i < TOTAL_RECORDS; i++) { + keyIndex = zipf.sample() -1; + fw.write(allKeys[keyIndex]); + fw.write('\n'); + } + } finally { + if (fw != null) + fw.close(); + } + + outputMsg("Create test data takes : " + (System.currentTimeMillis() - startTime) / 1000 + " seconds."); + outputMsg("Test data in : " + tempFile.getAbsolutePath()); + + return tempFile.getAbsolutePath(); + } + + //@Test + public void testSingleSpaceSaving() throws IOException { + String dataFile = prepareTestDate(); + TopNCounterTest.SpaceSavingConsumer spaceSavingCounter = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM); + TopNCounterTest.HashMapConsumer accurateCounter = new TopNCounterTest.HashMapConsumer(); + + for (TopNCounterTest.TestDataConsumer consumer : new TopNCounterTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) { + feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS); + } + + FileUtils.forceDelete(new File(dataFile)); + + compareResult(spaceSavingCounter, accurateCounter); + } + + private void compareResult(TopNCounterTest.TestDataConsumer firstConsumer, TopNCounterTest.TestDataConsumer secondConsumer) { + List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K); + outputMsg("Get topN, Space saving takes " + firstConsumer.getSpentTime() / 1000 + " seconds"); + List<Pair<String, Double>> realSequence = secondConsumer.getTopN(TOP_K); + outputMsg("Get topN, Merge sort takes " + secondConsumer.getSpentTime() / 1000 + " seconds"); + + int error = 0; + for (int i = 0; i < topResult1.size(); i++) { + outputMsg("Compare " + i); + + if (isClose(topResult1.get(i).getSecond().doubleValue(), realSequence.get(i).getSecond().doubleValue())) { + // if (topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst()) && topResult1.get(i).getSecond().doubleValue() == realSequence.get(i).getSecond().doubleValue()) { + outputMsg("Passed; key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond()); + } else { + outputMsg("Failed; space saving key:" + topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond()); + outputMsg("Failed; correct key:" + realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond()); + error++; + } + } + + org.junit.Assert.assertEquals(0, error); + } + + private boolean isClose(double value1, double value2) { + + if(Math.abs(value1 - value2) < 5.0) + return true; + + return false; + } + + @Test + public void testParallelSpaceSaving() throws IOException, ClassNotFoundException { + String dataFile = prepareTestDate(); + + TopNCounterTest.SpaceSavingConsumer[] parallelCounters = new TopNCounterTest.SpaceSavingConsumer[PARALLEL]; + + for (int i = 0; i < PARALLEL; i++) { + parallelCounters[i] = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM); + } + + int slice = TOTAL_RECORDS / PARALLEL; + int startPosition = 0; + for (int i = 0; i < PARALLEL; i++) { + feedDataToConsumer(dataFile, parallelCounters[i], startPosition, startPosition + slice); + startPosition += slice; + } + + TopNCounterTest.SpaceSavingConsumer[] mergedCounters = singleMerge(parallelCounters); + + TopNCounterTest.HashMapConsumer accurateCounter = new TopNCounterTest.HashMapConsumer(); + feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS); + + compareResult(mergedCounters[0], accurateCounter); + FileUtils.forceDelete(new File(dataFile)); + + } + + private TopNCounterTest.SpaceSavingConsumer[] singleMerge(TopNCounterTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException { + List<TopNCounterTest.SpaceSavingConsumer> list = Lists.newArrayList(); + if (consumers.length == 1) + return consumers; + + TopNCounterTest.SpaceSavingConsumer merged = new TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM); + + for (int i=0, n=consumers.length; i<n; i++) { + merged.vs.merge(consumers[i].vs); + } + + merged.vs.retain(TOP_K * SPACE_SAVING_ROOM); // remove extra elements; + return new TopNCounterTest.SpaceSavingConsumer[] {merged}; + + } + + private TopNCounterTest.SpaceSavingConsumer[] binaryMerge(TopNCounterTest.SpaceSavingConsumer[] consumers) throws IOException, ClassNotFoundException { + List<TopNCounterTest.SpaceSavingConsumer> list = Lists.newArrayList(); + if (consumers.length == 1) + return consumers; + + + for (int i = 0, n = consumers.length; i < n; i = i + 2) { + if (i + 1 < n) { + consumers[i].vs.merge(consumers[i + 1].vs); + } + + list.add(consumers[i]); + } + + return binaryMerge(list.toArray(new TopNCounterTest.SpaceSavingConsumer[list.size()])); + } + + + private void feedDataToConsumer(String dataFile, TopNCounterTest.TestDataConsumer consumer, int startLine, int endLine) throws IOException { + long startTime = System.currentTimeMillis(); + BufferedReader bufferedReader = new BufferedReader(new FileReader(dataFile)); + + int lineNum = 0; + String line = bufferedReader.readLine(); + while (line != null) { + if (lineNum >= startLine && lineNum < endLine) { + consumer.addElement(line, 1.0); + } + line = bufferedReader.readLine(); + lineNum++; + } + + bufferedReader.close(); + outputMsg("feed data to " + consumer.getClass().getCanonicalName() + " take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000); + } + + private void outputMsg(String msg) { + if (verbose) + System.out.println(msg); + } + + private static interface TestDataConsumer { + public void addElement(String elementKey, double value); + + public List<Pair<String, Double>> getTopN(int k); + + public long getSpentTime(); + } + + private class SpaceSavingConsumer implements TopNCounterTest.TestDataConsumer { + private long timeSpent = 0; + protected TopNCounter<String> vs; + + public SpaceSavingConsumer(int space) { + vs = new TopNCounter<String>(space); + + } + + public void addElement(String key, double value) { + //outputMsg("Adding " + key + ":" + incrementCount); + long startTime = System.currentTimeMillis(); + vs.offer(key, value); + timeSpent += (System.currentTimeMillis() - startTime); + } + + @Override + public List<Pair<String, Double>> getTopN(int k) { + long startTime = System.currentTimeMillis(); + List<Counter<String>> tops = vs.topK(k); + List<Pair<String, Double>> allRecords = Lists.newArrayList(); + + for (Counter<String> counter : tops) + allRecords.add(Pair.newPair(counter.getItem(), counter.getCount())); + timeSpent += (System.currentTimeMillis() - startTime); + return allRecords; + } + + @Override + public long getSpentTime() { + return timeSpent; + } + } + + private class HashMapConsumer implements TopNCounterTest.TestDataConsumer { + + private long timeSpent = 0; + private Map<String, Double> hashMap; + + public HashMapConsumer() { + hashMap = Maps.newHashMap(); + } + + public void addElement(String key, double value) { + long startTime = System.currentTimeMillis(); + if (hashMap.containsKey(key)) { + hashMap.put(key, hashMap.get(key) + value); + } else { + hashMap.put(key, value); + } + timeSpent += (System.currentTimeMillis() - startTime); + } + + @Override + public List<Pair<String, Double>> getTopN(int k) { + long startTime = System.currentTimeMillis(); + List<Pair<String, Double>> allRecords = Lists.newArrayList(); + + for (Map.Entry<String, Double> entry : hashMap.entrySet()) { + allRecords.add(Pair.newPair(entry.getKey(), entry.getValue())); + } + + Collections.sort(allRecords, new Comparator<Pair<String, Double>>() { + @Override + public int compare(Pair<String, Double> o1, Pair<String, Double> o2) { + return o1.getSecond() < o2.getSecond() ? 1 : (o1.getSecond() > o2.getSecond() ? -1 : 0); + } + }); + timeSpent += (System.currentTimeMillis() - startTime); + return allRecords.subList(0, k); + } + + @Override + public long getSpentTime() { + return timeSpent; + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 58900e0..3aad6ae 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -40,7 +40,7 @@ import org.apache.hadoop.io.SequenceFile.Reader; import org.apache.hadoop.io.SequenceFile.Reader.Option; import org.apache.hadoop.util.ReflectionUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java index a1582cb..02fe0f0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.kv.RowConstants; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 65d1530..6b54f17 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeInstance; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index d8880dd..c72e5d6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -24,7 +24,7 @@ import java.util.Collection; import java.util.List; import org.apache.hadoop.io.Text; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.cuboid.CuboidScheduler; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java index 05b56aa..0a6c123 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java @@ -38,7 +38,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.util.ReflectionUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java index 410cec7..b9138fb 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java @@ -19,11 +19,10 @@ package org.apache.kylin.engine.mr.steps; import java.util.ArrayList; -import java.util.BitSet; import java.util.List; import org.apache.commons.lang.RandomStringUtils; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java index 5f2f100..f46683e 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java @@ -7,7 +7,7 @@ import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.CuboidStatsUtil; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 7f5ab6b..285729f 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.Dictionary; @@ -57,7 +57,6 @@ import org.apache.kylin.cube.model.*; import org.apache.kylin.cube.util.CubingUtils; import org.apache.kylin.dict.*; import org.apache.kylin.engine.mr.common.CubeStatsReader; -import org.apache.kylin.engine.mr.steps.InMemCuboidJob; import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter; import org.apache.kylin.engine.spark.cube.DefaultTupleConverter; import org.apache.kylin.engine.spark.util.IteratorUtils; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java index 31e6086..ef726a2 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java @@ -35,7 +35,7 @@ package org.apache.kylin.engine.streaming; import java.util.Map; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.cube.inmemcubing.ICuboidWriter; import org.apache.kylin.metadata.model.IBuildable; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java index 1965f8c..3fbade2 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java @@ -35,7 +35,7 @@ package org.apache.kylin.engine.streaming; import java.util.Map; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.StreamingBatch; import org.apache.kylin.engine.streaming.util.StreamingUtils; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java index 91cdea9..7946438 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java @@ -35,7 +35,7 @@ package org.apache.kylin.engine.streaming; import java.util.Map; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.StreamingBatch; import org.apache.kylin.cube.inmemcubing.ICuboidWriter; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java index 044dcca..d7056cf 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java @@ -45,7 +45,7 @@ import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.Nullable; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java index 8671f43..eb213b8 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java @@ -19,7 +19,7 @@ package org.apache.kylin.invertedindex.measure; import java.nio.ByteBuffer; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.metadata.datatype.DataType; /** http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java index 885656d..cbfd8c3 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java index 988187d..19f5759 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java @@ -30,7 +30,7 @@ import java.util.Map; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.engine.mr.KylinReducer; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java ---------------------------------------------------------------------- diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java index 5e8788a..57721d6 100644 --- a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java +++ b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java @@ -33,11 +33,9 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; import org.apache.hadoop.mrunit.types.Pair; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.source.hive.cardinality.ColumnCardinalityMapper; -import org.apache.kylin.source.hive.cardinality.ColumnCardinalityReducer; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java index 76c6637..fa7d81e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.invertedindex.index.RawTableRecord; http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java index fdab8eb..770be3c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.inmemcubing.ICuboidWriter;