http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java deleted file mode 100644 index 9727de7..0000000 --- a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java +++ /dev/null @@ -1,536 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.IdentityHashMap; -import java.util.LinkedList; - -import org.apache.kylin.common.util.BytesUtil; - -/** - * Builds a dictionary using Trie structure. All values are taken in byte[] form - * and organized in a Trie with ordering. Then numeric IDs are assigned in - * sequence. - * - * @author yangli9 - */ -public class TrieDictionaryBuilder<T> { - - public static class Node { - public byte[] part; - public boolean isEndOfValue; - public ArrayList<Node> children; - - public int nValuesBeneath; // only present after stats() - - Node(byte[] value, boolean isEndOfValue) { - reset(value, isEndOfValue); - } - - Node(byte[] value, boolean isEndOfValue, ArrayList<Node> children) { - reset(value, isEndOfValue, children); - } - - void reset(byte[] value, boolean isEndOfValue) { - reset(value, isEndOfValue, new ArrayList<Node>()); - } - - void reset(byte[] value, boolean isEndOfValue, ArrayList<Node> children) { - this.part = value; - this.isEndOfValue = isEndOfValue; - this.children = children; - } - } - - public static interface Visitor { - void visit(Node n, int level); - } - - // ============================================================================ - - private Node root; - private BytesConverter<T> bytesConverter; - - public TrieDictionaryBuilder(BytesConverter<T> bytesConverter) { - this.root = new Node(new byte[0], false); - this.bytesConverter = bytesConverter; - } - - public void addValue(T value) { - addValue(bytesConverter.convertToBytes(value)); - } - - public void addValue(byte[] value) { - addValueR(root, value, 0); - } - - private void addValueR(Node node, byte[] value, int start) { - // match the value part of current node - int i = 0, j = start; - int n = node.part.length, nn = value.length; - int comp = 0; - for (; i < n && j < nn; i++, j++) { - comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]); - if (comp != 0) - break; - } - - // if value fully matched within the current node - if (j == nn) { - // if equals to current node, just mark end of value - if (i == n) { - node.isEndOfValue = true; - } - // otherwise, split the current node into two - else { - Node c = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); - node.reset(BytesUtil.subarray(node.part, 0, i), true); - node.children.add(c); - } - return; - } - - // if partially matched the current, split the current node, add the new - // value, make a 3-way - if (i < n) { - Node c1 = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); - Node c2 = new Node(BytesUtil.subarray(value, j, nn), true); - node.reset(BytesUtil.subarray(node.part, 0, i), false); - if (comp < 0) { - node.children.add(c1); - node.children.add(c2); - } else { - node.children.add(c2); - node.children.add(c1); - } - return; - } - - // out matched the current, binary search the next byte for a child node - // to continue - byte lookfor = value[j]; - int lo = 0; - int hi = node.children.size() - 1; - int mid = 0; - boolean found = false; - comp = 0; - while (!found && lo <= hi) { - mid = lo + (hi - lo) / 2; - comp = BytesUtil.compareByteUnsigned(lookfor, node.children.get(mid).part[0]); - if (comp < 0) - hi = mid - 1; - else if (comp > 0) - lo = mid + 1; - else - found = true; - } - // found a child node matching the first byte, continue in that child - if (found) { - addValueR(node.children.get(mid), value, j); - } - // otherwise, make the value a new child - else { - Node c = new Node(BytesUtil.subarray(value, j, nn), true); - node.children.add(comp <= 0 ? mid : mid + 1, c); - } - } - - public void traverse(Visitor visitor) { - traverseR(root, visitor, 0); - } - - private void traverseR(Node node, Visitor visitor, int level) { - visitor.visit(node, level); - for (Node c : node.children) - traverseR(c, visitor, level + 1); - } - - public void traversePostOrder(Visitor visitor) { - traversePostOrderR(root, visitor, 0); - } - - private void traversePostOrderR(Node node, Visitor visitor, int level) { - for (Node c : node.children) - traversePostOrderR(c, visitor, level + 1); - visitor.visit(node, level); - } - - public static class Stats { - public int nValues; // number of values in total - public int nValueBytesPlain; // number of bytes for all values - // uncompressed - public int nValueBytesCompressed; // number of values bytes in Trie - // (compressed) - public int maxValueLength; // size of longest value in bytes - - // the trie is multi-byte-per-node - public int mbpn_nNodes; // number of nodes in trie - public int mbpn_trieDepth; // depth of trie - public int mbpn_maxFanOut; // the maximum no. children - public int mbpn_nChildLookups; // number of child lookups during lookup - // every value once - public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every - // value once - public int mbpn_sizeValueTotal; // the sum of value space in all nodes - public int mbpn_sizeNoValueBytes; // size of field noValueBytes - public int mbpn_sizeNoValueBeneath; // size of field noValuesBeneath, - // depends on cardinality - public int mbpn_sizeChildOffset; // size of field childOffset, points to - // first child in flattened array - public int mbpn_footprint; // MBPN footprint in bytes - - // stats for one-byte-per-node as well, so there's comparison - public int obpn_sizeValue; // size of value per node, always 1 - public int obpn_sizeNoValuesBeneath; // size of field noValuesBeneath, - // depends on cardinality - public int obpn_sizeChildCount; // size of field childCount, enables - // binary search among children - public int obpn_sizeChildOffset; // size of field childOffset, points to - // first child in flattened array - public int obpn_nNodes; // no. nodes in OBPN trie - public int obpn_footprint; // OBPN footprint in bytes - - public void print() { - PrintStream out = System.out; - out.println("============================================================================"); - out.println("No. values: " + nValues); - out.println("No. bytes raw: " + nValueBytesPlain); - out.println("No. bytes in trie: " + nValueBytesCompressed); - out.println("Longest value length: " + maxValueLength); - - // flatten trie footprint calculation, case of One-Byte-Per-Node - out.println("----------------------------------------------------------------------------"); - out.println("OBPN node size: " + (obpn_sizeValue + obpn_sizeNoValuesBeneath + obpn_sizeChildCount + obpn_sizeChildOffset) + " = " + obpn_sizeValue + " + " + obpn_sizeNoValuesBeneath + " + " + obpn_sizeChildCount + " + " + obpn_sizeChildOffset); - out.println("OBPN no. nodes: " + obpn_nNodes); - out.println("OBPN trie depth: " + maxValueLength); - out.println("OBPN footprint: " + obpn_footprint + " in bytes"); - - // flatten trie footprint calculation, case of Multi-Byte-Per-Node - out.println("----------------------------------------------------------------------------"); - out.println("MBPN max fan out: " + mbpn_maxFanOut); - out.println("MBPN no. child lookups: " + mbpn_nChildLookups); - out.println("MBPN total fan out: " + mbpn_nTotalFanOut); - out.println("MBPN average fan out: " + (double) mbpn_nTotalFanOut / mbpn_nChildLookups); - out.println("MBPN values size total: " + mbpn_sizeValueTotal); - out.println("MBPN node size: " + (mbpn_sizeNoValueBytes + mbpn_sizeNoValueBeneath + mbpn_sizeChildOffset) + " = " + mbpn_sizeNoValueBytes + " + " + mbpn_sizeNoValueBeneath + " + " + mbpn_sizeChildOffset); - out.println("MBPN no. nodes: " + mbpn_nNodes); - out.println("MBPN trie depth: " + mbpn_trieDepth); - out.println("MBPN footprint: " + mbpn_footprint + " in bytes"); - } - } - - /** out print some statistics of the trie and the dictionary built from it */ - public Stats stats() { - // calculate nEndValueBeneath - traversePostOrder(new Visitor() { - @Override - public void visit(Node n, int level) { - n.nValuesBeneath = n.isEndOfValue ? 1 : 0; - for (Node c : n.children) - n.nValuesBeneath += c.nValuesBeneath; - } - }); - - // run stats - final Stats s = new Stats(); - final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>(); - traverse(new Visitor() { - @Override - public void visit(Node n, int level) { - if (n.isEndOfValue) - s.nValues++; - s.nValueBytesPlain += n.part.length * n.nValuesBeneath; - s.nValueBytesCompressed += n.part.length; - s.mbpn_nNodes++; - if (s.mbpn_trieDepth < level + 1) - s.mbpn_trieDepth = level + 1; - if (n.children.size() > 0) { - if (s.mbpn_maxFanOut < n.children.size()) - s.mbpn_maxFanOut = n.children.size(); - int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0); - s.mbpn_nChildLookups += childLookups; - s.mbpn_nTotalFanOut += childLookups * n.children.size(); - } - - if (level < lenAtLvl.size()) - lenAtLvl.set(level, n.part.length); - else - lenAtLvl.add(n.part.length); - int lenSoFar = 0; - for (int i = 0; i <= level; i++) - lenSoFar += lenAtLvl.get(i); - if (lenSoFar > s.maxValueLength) - s.maxValueLength = lenSoFar; - } - }); - - // flatten trie footprint calculation, case of One-Byte-Per-Node - s.obpn_sizeValue = 1; - s.obpn_sizeNoValuesBeneath = BytesUtil.sizeForValue(s.nValues); - s.obpn_sizeChildCount = 1; - s.obpn_sizeChildOffset = 4; // MSB used as isEndOfValue flag - s.obpn_nNodes = s.nValueBytesCompressed; // no. nodes is the total - // number of compressed - // bytes in OBPN - s.obpn_footprint = s.obpn_nNodes * (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset); - while (true) { // minimize the offset size to match the footprint - int t = s.obpn_nNodes * (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset - 1); - if (BytesUtil.sizeForValue(t * 2) <= s.obpn_sizeChildOffset - 1) { // *2 - // because - // MSB - // of - // offset - // is - // used - // for - // isEndOfValue - // flag - s.obpn_sizeChildOffset--; - s.obpn_footprint = t; - } else - break; - } - - // flatten trie footprint calculation, case of Multi-Byte-Per-Node - s.mbpn_sizeValueTotal = s.nValueBytesCompressed; - s.mbpn_sizeNoValueBytes = 1; - s.mbpn_sizeNoValueBeneath = BytesUtil.sizeForValue(s.nValues); - s.mbpn_sizeChildOffset = 4; - s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset); - while (true) { // minimize the offset size to match the footprint - int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset - 1); - if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) { // *4 - // because - // 2 - // MSB - // of - // offset - // is - // used - // for - // isEndOfValue - // & - // isEndChild - // flag - s.mbpn_sizeChildOffset--; - s.mbpn_footprint = t; - } else - break; - } - - return s; - } - - /** out print trie for debug */ - public void print() { - print(System.out); - } - - public void print(final PrintStream out) { - traverse(new Visitor() { - @Override - public void visit(Node n, int level) { - try { - for (int i = 0; i < level; i++) - out.print(" "); - out.print(new String(n.part, "UTF-8")); - out.print(" - "); - if (n.nValuesBeneath > 0) - out.print(n.nValuesBeneath); - if (n.isEndOfValue) - out.print("*"); - out.print("\n"); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } - } - }); - } - - private CompleteParts completeParts = new CompleteParts(); - - private class CompleteParts { - byte[] data = new byte[4096]; - int current = 0; - - public void append(byte[] part) { - while (current + part.length > data.length) - expand(); - - System.arraycopy(part, 0, data, current, part.length); - current += part.length; - } - - public void withdraw(int size) { - current -= size; - } - - public byte[] retrieve() { - return Arrays.copyOf(data, current); - } - - private void expand() { - byte[] temp = new byte[2 * data.length]; - System.arraycopy(data, 0, temp, 0, data.length); - data = temp; - } - } - - // there is a 255 limitation of length for each node's part. - // we interpolate nodes to satisfy this when a node's part becomes - // too long(overflow) - private void checkOverflowParts(Node node) { - LinkedList<Node> childrenCopy = new LinkedList<Node>(node.children); - for (Node child : childrenCopy) { - if (child.part.length > 255) { - byte[] first255 = Arrays.copyOf(child.part, 255); - - completeParts.append(node.part); - completeParts.append(first255); - byte[] visited = completeParts.retrieve(); - this.addValue(visited); - completeParts.withdraw(255); - completeParts.withdraw(node.part.length); - } - } - - completeParts.append(node.part);// by here the node.children may have - // been changed - for (Node child : node.children) { - checkOverflowParts(child); - } - completeParts.withdraw(node.part.length); - } - - /** - * Flatten the trie into a byte array for a minimized memory footprint. - * Lookup remains fast. Cost is inflexibility to modify (becomes immutable). - * - * Flattened node structure is HEAD + NODEs, for each node: - o byte, offset - * to child node, o = stats.mbpn_sizeChildOffset - 1 bit, isLastChild flag, - * the 1st MSB of o - 1 bit, isEndOfValue flag, the 2nd MSB of o - c byte, - * number of values beneath, c = stats.mbpn_sizeNoValueBeneath - 1 byte, - * number of value bytes - n byte, value bytes - */ - public TrieDictionary<T> build(int baseId) { - byte[] trieBytes = buildTrieBytes(baseId); - TrieDictionary<T> r = new TrieDictionary<T>(trieBytes); - return r; - } - - protected byte[] buildTrieBytes(int baseId) { - checkOverflowParts(this.root); - - Stats stats = stats(); - int sizeNoValuesBeneath = stats.mbpn_sizeNoValueBeneath; - int sizeChildOffset = stats.mbpn_sizeChildOffset; - - // write head - byte[] head; - try { - ByteArrayOutputStream byteBuf = new ByteArrayOutputStream(); - DataOutputStream headOut = new DataOutputStream(byteBuf); - headOut.write(TrieDictionary.HEAD_MAGIC); - headOut.writeShort(0); // head size, will back fill - headOut.writeInt(stats.mbpn_footprint); // body size - headOut.write(sizeChildOffset); - headOut.write(sizeNoValuesBeneath); - headOut.writeShort(baseId); - headOut.writeShort(stats.maxValueLength); - headOut.writeUTF(bytesConverter == null ? "" : bytesConverter.getClass().getName()); - headOut.close(); - head = byteBuf.toByteArray(); - BytesUtil.writeUnsigned(head.length, head, TrieDictionary.HEAD_SIZE_I, 2); - } catch (IOException e) { - throw new RuntimeException(e); // shall not happen, as we are - // writing in memory - } - - byte[] trieBytes = new byte[stats.mbpn_footprint + head.length]; - System.arraycopy(head, 0, trieBytes, 0, head.length); - - LinkedList<Node> open = new LinkedList<Node>(); - IdentityHashMap<Node, Integer> offsetMap = new IdentityHashMap<Node, Integer>(); - - // write body - int o = head.length; - offsetMap.put(root, o); - o = build_writeNode(root, o, true, sizeNoValuesBeneath, sizeChildOffset, trieBytes); - if (!root.children.isEmpty()) - open.addLast(root); - - while (!open.isEmpty()) { - Node parent = open.removeFirst(); - build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes); - for (int i = 0; i < parent.children.size(); i++) { - Node c = parent.children.get(i); - boolean isLastChild = (i == parent.children.size() - 1); - offsetMap.put(c, o); - o = build_writeNode(c, o, isLastChild, sizeNoValuesBeneath, sizeChildOffset, trieBytes); - if (!c.children.isEmpty()) - open.addLast(c); - } - } - - if (o != trieBytes.length) - throw new RuntimeException(); - return trieBytes; - } - - private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) { - int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE); - BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset); - trieBytes[parentOffset] |= flags; - } - - private int build_writeNode(Node n, int offset, boolean isLastChild, int sizeNoValuesBeneath, int sizeChildOffset, byte[] trieBytes) { - int o = offset; - - // childOffset - if (isLastChild) - trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD; - if (n.isEndOfValue) - trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE; - o += sizeChildOffset; - - // nValuesBeneath - BytesUtil.writeUnsigned(n.nValuesBeneath, trieBytes, o, sizeNoValuesBeneath); - o += sizeNoValuesBeneath; - - // nValueBytes - if (n.part.length > 255) - throw new RuntimeException(); - BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1); - o++; - - // valueBytes - System.arraycopy(n.part, 0, trieBytes, o, n.part.length); - o += n.part.length; - - return o; - } - -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java deleted file mode 100644 index 3769302..0000000 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict.lookup; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.common.util.Pair; - -/** - * @author yangli9 - * - */ -public class FileTable implements ReadableTable { - - public static final String DELIM_AUTO = "auto"; - public static final String DELIM_COMMA = ","; - - String path; - String delim; - int nColumns; - - public FileTable(String path, int nColumns) { - this(path, DELIM_AUTO, nColumns); - } - - public FileTable(String path, String delim, int nColumns) { - this.path = path; - this.delim = delim; - this.nColumns = nColumns; - } - - @Override - public TableReader getReader() throws IOException { - return new FileTableReader(path, delim, nColumns); - } - - @Override - public TableSignature getSignature() throws IOException { - if (!exists()) - throw new IllegalStateException("Table not exists :" + path); - - try { - Pair<Long, Long> sizeAndLastModified = getSizeAndLastModified(path); - return new TableSignature(path, sizeAndLastModified.getFirst(), sizeAndLastModified.getSecond()); - } catch (FileNotFoundException ex) { - return null; - } - } - - @Override - public boolean exists() throws IOException { - FileSystem fs = HadoopUtil.getFileSystem(path); - return fs.exists(new Path(path)); - } - - @Override - public String toString() { - return path; - } - - public static Pair<Long, Long> getSizeAndLastModified(String path) throws IOException { - FileSystem fs = HadoopUtil.getFileSystem(path); - - // get all contained files if path is directory - ArrayList<FileStatus> allFiles = new ArrayList<>(); - FileStatus status = fs.getFileStatus(new Path(path)); - if (status.isFile()) { - allFiles.add(status); - } else { - FileStatus[] listStatus = fs.listStatus(new Path(path)); - allFiles.addAll(Arrays.asList(listStatus)); - } - - long size = 0; - long lastModified = 0; - for (FileStatus file : allFiles) { - size += file.getLen(); - lastModified = Math.max(lastModified, file.getModificationTime()); - } - - return new Pair<Long, Long>(size, lastModified); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java deleted file mode 100644 index a6de5df..0000000 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict.lookup; - -import java.io.BufferedReader; -import java.io.Closeable; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStreamReader; - -import org.apache.commons.lang.StringEscapeUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.common.util.StringSplitter; -import org.apache.kylin.dict.lookup.ReadableTable.TableReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tables are typically CSV or SEQ file. - * - * @author yangli9 - */ -public class FileTableReader implements TableReader { - - private static final Logger logger = LoggerFactory.getLogger(FileTableReader.class); - private static final char CSV_QUOTE = '"'; - private static final String[] DETECT_DELIMS = new String[] { "\177", "|", "\t", "," }; - - private String filePath; - private String delim; - private RowReader reader; - - private String curLine; - private String[] curColumns; - private int expectedColumnNumber = -1; // helps delimiter detection - - public FileTableReader(String filePath, String delim, int expectedColumnNumber) throws IOException { - this.filePath = filePath; - this.delim = delim; - this.expectedColumnNumber = expectedColumnNumber; - - FileSystem fs = HadoopUtil.getFileSystem(filePath); - - try { - this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath); - - } catch (IOException e) { - if (isExceptionSayingNotSeqFile(e) == false) - throw e; - - this.reader = new CsvRowReader(fs, filePath); - } - } - - private boolean isExceptionSayingNotSeqFile(IOException e) { - if (e.getMessage() != null && e.getMessage().contains("not a SequenceFile")) - return true; - - if (e instanceof EOFException) // in case the file is very very small - return true; - - return false; - } - - @Override - public boolean next() throws IOException { - curLine = reader.nextLine(); - curColumns = null; - return curLine != null; - } - - public String getLine() { - return curLine; - } - - @Override - public String[] getRow() { - if (curColumns == null) { - if (FileTable.DELIM_AUTO.equals(delim)) - delim = autoDetectDelim(curLine); - - if (delim == null) - curColumns = new String[] { curLine }; - else - curColumns = split(curLine, delim); - } - return curColumns; - } - - private String[] split(String line, String delim) { - // FIXME CVS line should be parsed considering escapes - String str[] = StringSplitter.split(line, delim); - - // un-escape CSV - if (FileTable.DELIM_COMMA.equals(delim)) { - for (int i = 0; i < str.length; i++) { - str[i] = unescapeCsv(str[i]); - } - } - - return str; - } - - private String unescapeCsv(String str) { - if (str == null || str.length() < 2) - return str; - - str = StringEscapeUtils.unescapeCsv(str); - - // unescapeCsv may not remove the outer most quotes - if (str.charAt(0) == CSV_QUOTE && str.charAt(str.length() - 1) == CSV_QUOTE) - str = str.substring(1, str.length() - 1); - - return str; - } - - @Override - public void close() throws IOException { - if (reader != null) - reader.close(); - } - - private String autoDetectDelim(String line) { - if (expectedColumnNumber > 0) { - for (String delim : DETECT_DELIMS) { - if (StringSplitter.split(line, delim).length == expectedColumnNumber) { - logger.info("Auto detect delim to be '" + delim + "', split line to " + expectedColumnNumber + " columns -- " + line); - return delim; - } - } - } - - logger.info("Auto detect delim to be null, will take THE-WHOLE-LINE as a single value, for " + filePath); - return null; - } - - // ============================================================================ - - private interface RowReader extends Closeable { - String nextLine() throws IOException; // return null on EOF - } - - private class SeqRowReader implements RowReader { - Reader reader; - Writable key; - Text value; - - SeqRowReader(Configuration hconf, FileSystem fs, String path) throws IOException { - path = HadoopUtil.fixWindowsPath(path); - reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path))); - key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf); - value = new Text(); - } - - @Override - public String nextLine() throws IOException { - boolean hasNext = reader.next(key, value); - if (hasNext) - return Bytes.toString(value.getBytes(), 0, value.getLength()); - else - return null; - } - - @Override - public void close() throws IOException { - reader.close(); - } - } - - private class CsvRowReader implements RowReader { - BufferedReader reader; - - CsvRowReader(FileSystem fs, String path) throws IOException { - path = HadoopUtil.fixWindowsPath(path); - FSDataInputStream in = fs.open(new Path(path)); - reader = new BufferedReader(new InputStreamReader(in, "UTF-8")); - } - - @Override - public String nextLine() throws IOException { - return reader.readLine(); - } - - @Override - public void close() throws IOException { - reader.close(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java deleted file mode 100644 index 0237ff4..0000000 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict.lookup; - -import java.io.IOException; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HiveClient; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.TableDesc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class HiveTable implements ReadableTable { - - private static final Logger logger = LoggerFactory.getLogger(HiveTable.class); - - final private String database; - final private String hiveTable; - - private HiveClient hiveClient; - - public HiveTable(MetadataManager metaMgr, String table) { - TableDesc tableDesc = metaMgr.getTableDesc(table); - this.database = tableDesc.getDatabase(); - this.hiveTable = tableDesc.getName(); - } - - @Override - public TableReader getReader() throws IOException { - return new HiveTableReader(database, hiveTable); - } - - @Override - public TableSignature getSignature() throws IOException { - try { - String path = computeHDFSLocation(); - Pair<Long, Long> sizeAndLastModified = FileTable.getSizeAndLastModified(path); - long size = sizeAndLastModified.getFirst(); - long lastModified = sizeAndLastModified.getSecond(); - - // for non-native hive table, cannot rely on size & last modified on HDFS - if (getHiveClient().isNativeTable(database, hiveTable) == false) { - lastModified = System.currentTimeMillis(); // assume table is ever changing - } - - return new TableSignature(path, size, lastModified); - - } catch (Exception e) { - if (e instanceof IOException) - throw (IOException) e; - else - throw new IOException(e); - } - } - - @Override - public boolean exists() throws IOException { - return true; - } - - private String computeHDFSLocation() throws Exception { - - String override = KylinConfig.getInstanceFromEnv().getOverrideHiveTableLocation(hiveTable); - if (override != null) { - logger.debug("Override hive table location " + hiveTable + " -- " + override); - return override; - } - - return getHiveClient().getHiveTableLocation(database, hiveTable); - } - - @Override - public String toString() { - return "hive: database=[" + database + "], table=[" + hiveTable + "]"; - } - - public HiveClient getHiveClient() { - if (hiveClient == null) { - hiveClient = new HiveClient(); - } - return hiveClient; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java deleted file mode 100644 index b103442..0000000 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict.lookup; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hive.hcatalog.common.HCatException; -import org.apache.hive.hcatalog.data.HCatRecord; -import org.apache.hive.hcatalog.data.transfer.DataTransferFactory; -import org.apache.hive.hcatalog.data.transfer.HCatReader; -import org.apache.hive.hcatalog.data.transfer.ReadEntity; -import org.apache.hive.hcatalog.data.transfer.ReaderContext; -import org.apache.kylin.dict.lookup.ReadableTable.TableReader; - -/** - * An implementation of TableReader with HCatalog for Hive table. - * @author shaoshi - * - */ -public class HiveTableReader implements TableReader { - - private String dbName; - private String tableName; - private int currentSplit = -1; - private ReaderContext readCntxt = null; - private Iterator<HCatRecord> currentHCatRecordItr = null; - private HCatRecord currentHCatRecord; - private int numberOfSplits = 0; - private Map<String, String> partitionKV = null; - - /** - * Constructor for reading whole hive table - * @param dbName - * @param tableName - * @throws IOException - */ - public HiveTableReader(String dbName, String tableName) throws IOException { - this(dbName, tableName, null); - } - - /** - * Constructor for reading a partition of the hive table - * @param dbName - * @param tableName - * @param partitionKV key-value pairs condition on the partition - * @throws IOException - */ - public HiveTableReader(String dbName, String tableName, Map<String, String> partitionKV) throws IOException { - this.dbName = dbName; - this.tableName = tableName; - this.partitionKV = partitionKV; - initialize(); - } - - private void initialize() throws IOException { - try { - this.readCntxt = getHiveReaderContext(dbName, tableName, partitionKV); - } catch (Exception e) { - e.printStackTrace(); - throw new IOException(e); - } - - this.numberOfSplits = readCntxt.numSplits(); - } - - @Override - public boolean next() throws IOException { - - while (currentHCatRecordItr == null || !currentHCatRecordItr.hasNext()) { - currentSplit++; - if (currentSplit == numberOfSplits) { - return false; - } - - currentHCatRecordItr = loadHCatRecordItr(readCntxt, currentSplit); - } - - currentHCatRecord = currentHCatRecordItr.next(); - - return true; - } - - @Override - public String[] getRow() { - List<Object> allFields = currentHCatRecord.getAll(); - List<String> rowValues = new ArrayList<String>(allFields.size()); - for (Object o : allFields) { - rowValues.add(o != null ? o.toString() : ""); - } - - return rowValues.toArray(new String[allFields.size()]); - } - - @Override - public void close() throws IOException { - this.readCntxt = null; - this.currentHCatRecordItr = null; - this.currentHCatRecord = null; - this.currentSplit = -1; - } - - public String toString() { - return "hive table reader for: " + dbName + "." + tableName; - } - - private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception { - HiveConf hiveConf = new HiveConf(HiveTableReader.class); - Iterator<Entry<String, String>> itr = hiveConf.iterator(); - Map<String, String> map = new HashMap<String, String>(); - while (itr.hasNext()) { - Entry<String, String> kv = itr.next(); - map.put(kv.getKey(), kv.getValue()); - } - - ReadEntity entity; - if (partitionKV == null || partitionKV.size() == 0) { - entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build(); - } else { - entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV).build(); - } - - HCatReader reader = DataTransferFactory.getHCatReader(entity, map); - ReaderContext cntxt = reader.prepareRead(); - - return cntxt; - } - - private static Iterator<HCatRecord> loadHCatRecordItr(ReaderContext readCntxt, int dataSplit) throws HCatException { - HCatReader currentHCatReader = DataTransferFactory.getHCatReader(readCntxt, dataSplit); - return currentHCatReader.read(); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java deleted file mode 100644 index d0039df..0000000 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict.lookup; - -import java.io.IOException; - -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.metadata.model.TableDesc; - -/** - * @author yangli9 - * - */ -public class LookupBytesTable extends LookupTable<ByteArray> { - - public LookupBytesTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException { - super(tableDesc, keyColumns, table); - } - - @Override - protected ByteArray[] convertRow(String[] cols) { - ByteArray[] r = new ByteArray[cols.length]; - for (int i = 0; i < cols.length; i++) { - r[i] = cols[i] == null ? null : new ByteArray(Bytes.toBytes(cols[i])); - } - return r; - } - - @Override - protected String toString(ByteArray cell) { - return Bytes.toString(cell.data); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java deleted file mode 100644 index aaec5c7..0000000 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict.lookup; - -import java.io.IOException; - -import org.apache.kylin.metadata.model.TableDesc; - -/** - * @author yangli9 - * - */ -public class LookupStringTable extends LookupTable<String> { - - public LookupStringTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException { - super(tableDesc, keyColumns, table); - } - - @Override - protected String[] convertRow(String[] cols) { - return cols; - } - - @Override - protected String toString(String cell) { - return cell; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java deleted file mode 100644 index 12c8bfd..0000000 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict.lookup; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.kylin.common.util.Array; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.dict.lookup.ReadableTable.TableReader; -import org.apache.kylin.metadata.model.TableDesc; - -import com.google.common.collect.Sets; - -/** - * An in-memory lookup table, in which each cell is an object of type T. The - * table is indexed by specified PK for fast lookup. - * - * @author yangli9 - */ -abstract public class LookupTable<T extends Comparable<T>> { - - protected TableDesc tableDesc; - protected String[] keyColumns; - protected ReadableTable table; - protected ConcurrentHashMap<Array<T>, T[]> data; - - public LookupTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException { - this.tableDesc = tableDesc; - this.keyColumns = keyColumns; - this.table = table; - this.data = new ConcurrentHashMap<Array<T>, T[]>(); - init(); - } - - protected void init() throws IOException { - int[] keyIndex = new int[keyColumns.length]; - for (int i = 0; i < keyColumns.length; i++) { - keyIndex[i] = tableDesc.findColumnByName(keyColumns[i]).getZeroBasedIndex(); - } - - TableReader reader = table.getReader(); - try { - while (reader.next()) { - initRow(reader.getRow(), keyIndex); - } - } finally { - reader.close(); - } - } - - @SuppressWarnings("unchecked") - private void initRow(String[] cols, int[] keyIndex) { - T[] value = convertRow(cols); - T[] keyCols = (T[]) java.lang.reflect.Array.newInstance(value[0].getClass(), keyIndex.length); - for (int i = 0; i < keyCols.length; i++) - keyCols[i] = value[keyIndex[i]]; - - Array<T> key = new Array<T>(keyCols); - - if (data.containsKey(key)) - throw new IllegalStateException("Dup key found, key=" + toString(keyCols) + ", value1=" + toString(data.get(key)) + ", value2=" + toString(value)); - - data.put(key, value); - } - - abstract protected T[] convertRow(String[] cols); - - public T[] getRow(Array<T> key) { - return data.get(key); - } - - public Collection<T[]> getAllRows() { - return data.values(); - } - - public List<T> scan(String col, List<T> values, String returnCol) { - ArrayList<T> result = new ArrayList<T>(); - int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex(); - int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex(); - for (T[] row : data.values()) { - if (values.contains(row[colIdx])) - result.add(row[returnIdx]); - } - return result; - } - - public Pair<T, T> mapRange(String col, T beginValue, T endValue, String returnCol) { - int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex(); - int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex(); - T returnBegin = null; - T returnEnd = null; - for (T[] row : data.values()) { - if (between(beginValue, row[colIdx], endValue)) { - T returnValue = row[returnIdx]; - if (returnBegin == null || returnValue.compareTo(returnBegin) < 0) { - returnBegin = returnValue; - } - if (returnEnd == null || returnValue.compareTo(returnEnd) > 0) { - returnEnd = returnValue; - } - } - } - if (returnBegin == null && returnEnd == null) - return null; - else - return new Pair<T, T>(returnBegin, returnEnd); - } - - public Set<T> mapValues(String col, Set<T> values, String returnCol) { - int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex(); - int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex(); - Set<T> result = Sets.newHashSetWithExpectedSize(values.size()); - for (T[] row : data.values()) { - if (values.contains(row[colIdx])) { - result.add(row[returnIdx]); - } - } - return result; - } - - private boolean between(T beginValue, T v, T endValue) { - return (beginValue == null || beginValue.compareTo(v) <= 0) && (endValue == null || v.compareTo(endValue) <= 0); - } - - public String toString() { - return "LookupTable [path=" + table + "]"; - } - - protected String toString(T[] cols) { - StringBuilder b = new StringBuilder(); - b.append("["); - for (int i = 0; i < cols.length; i++) { - if (i > 0) - b.append(","); - b.append(toString(cols[i])); - } - b.append("]"); - return b.toString(); - } - - abstract protected String toString(T cell); - - public void dump() { - for (Array<T> key : data.keySet()) { - System.out.println(toString(key.data) + " => " + toString(data.get(key))); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java deleted file mode 100644 index c60df21..0000000 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict.lookup; - -import java.io.Closeable; -import java.io.IOException; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - */ -public interface ReadableTable { - - /** Returns a reader to read the table. */ - public TableReader getReader() throws IOException; - - /** Used to detect table modifications mainly. Return null in case table does not exist. */ - public TableSignature getSignature() throws IOException; - - public boolean exists() throws IOException; - - public interface TableReader extends Closeable { - - /** Move to the next row, return false if no more record. */ - public boolean next() throws IOException; - - /** Get the current row. */ - public String[] getRow(); - - } - - // ============================================================================ - - @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) - public class TableSignature { - - @JsonProperty("path") - private String path; - @JsonProperty("size") - private long size; - @JsonProperty("last_modified_time") - private long lastModifiedTime; - - // for JSON serialization - public TableSignature() { - } - - public TableSignature(String path, long size, long lastModifiedTime) { - super(); - this.path = path; - this.size = size; - this.lastModifiedTime = lastModifiedTime; - } - - public void setPath(String path) { - this.path = path; - } - - public void setSize(long size) { - this.size = size; - } - - public void setLastModifiedTime(long lastModifiedTime) { - this.lastModifiedTime = lastModifiedTime; - } - - public String getPath() { - return path; - } - - public long getSize() { - return size; - } - - public long getLastModifiedTime() { - return lastModifiedTime; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32)); - result = prime * result + ((path == null) ? 0 : path.hashCode()); - result = prime * result + (int) (size ^ (size >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - TableSignature other = (TableSignature) obj; - if (lastModifiedTime != other.lastModifiedTime) - return false; - if (path == null) { - if (other.path != null) - return false; - } else if (!path.equals(other.path)) - return false; - if (size != other.size) - return false; - return true; - } - - @Override - public String toString() { - return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]"; - } - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java deleted file mode 100644 index 5f6b664..0000000 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict.lookup; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.dict.lookup.ReadableTable.TableSignature; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.TableDesc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author yangli9 - */ -public class SnapshotManager { - - private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class); - - // static cached instances - private static final ConcurrentHashMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>(); - - public static SnapshotManager getInstance(KylinConfig config) { - SnapshotManager r = SERVICE_CACHE.get(config); - if (r == null) { - r = new SnapshotManager(config); - SERVICE_CACHE.put(config, r); - if (SERVICE_CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - } - return r; - } - - // ============================================================================ - - private KylinConfig config; - private ConcurrentHashMap<String, SnapshotTable> snapshotCache; // resource path ==> SnapshotTable - - private SnapshotManager(KylinConfig config) { - this.config = config; - snapshotCache = new ConcurrentHashMap<String, SnapshotTable>(); - } - - public void wipeoutCache() { - snapshotCache.clear(); - } - - public SnapshotTable getSnapshotTable(String resourcePath) throws IOException { - SnapshotTable r = snapshotCache.get(resourcePath); - if (r == null) { - r = load(resourcePath, true); - snapshotCache.put(resourcePath, r); - } - return r; - } - - public void removeSnapshot(String resourcePath) throws IOException { - ResourceStore store = MetadataManager.getInstance(this.config).getStore(); - store.deleteResource(resourcePath); - snapshotCache.remove(resourcePath); - } - - public SnapshotTable buildSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException { - SnapshotTable snapshot = new SnapshotTable(table); - snapshot.updateRandomUuid(); - - String dup = checkDupByInfo(snapshot); - if (dup != null) { - logger.info("Identical input " + table.getSignature() + ", reuse existing snapshot at " + dup); - return getSnapshotTable(dup); - } - - if (snapshot.getSignature().getSize() / 1024 / 1024 > config.getTableSnapshotMaxMB()) { - throw new IllegalStateException("Table snapshot should be no greater than " + config.getTableSnapshotMaxMB() // - + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize()); - } - - snapshot.takeSnapshot(table, tableDesc); - - return trySaveNewSnapshot(snapshot); - } - - public SnapshotTable trySaveNewSnapshot(SnapshotTable snapshotTable) throws IOException { - - String dupTable = checkDupByContent(snapshotTable); - if (dupTable != null) { - logger.info("Identical snapshot content " + snapshotTable + ", reuse existing snapshot at " + dupTable); - return getSnapshotTable(dupTable); - } - - save(snapshotTable); - snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable); - - return snapshotTable; - } - - private String checkDupByInfo(SnapshotTable snapshot) throws IOException { - ResourceStore store = MetadataManager.getInstance(this.config).getStore(); - String resourceDir = snapshot.getResourceDir(); - ArrayList<String> existings = store.listResources(resourceDir); - if (existings == null) - return null; - - TableSignature sig = snapshot.getSignature(); - for (String existing : existings) { - SnapshotTable existingTable = load(existing, false); // skip cache, - // direct load from store - if (existingTable != null && sig.equals(existingTable.getSignature())) - return existing; - } - - return null; - } - - private String checkDupByContent(SnapshotTable snapshot) throws IOException { - ResourceStore store = MetadataManager.getInstance(this.config).getStore(); - String resourceDir = snapshot.getResourceDir(); - ArrayList<String> existings = store.listResources(resourceDir); - if (existings == null) - return null; - - for (String existing : existings) { - SnapshotTable existingTable = load(existing, true); // skip cache, direct load from store - if (existingTable != null && existingTable.equals(snapshot)) - return existing; - } - - return null; - } - - private void save(SnapshotTable snapshot) throws IOException { - ResourceStore store = MetadataManager.getInstance(this.config).getStore(); - String path = snapshot.getResourcePath(); - store.putResource(path, snapshot, SnapshotTableSerializer.FULL_SERIALIZER); - } - - private SnapshotTable load(String resourcePath, boolean loadData) throws IOException { - logger.info("Loading snapshotTable from " + resourcePath + ", with loadData: " + loadData); - ResourceStore store = MetadataManager.getInstance(this.config).getStore(); - - SnapshotTable table = store.getResource(resourcePath, SnapshotTable.class, loadData ? SnapshotTableSerializer.FULL_SERIALIZER : SnapshotTableSerializer.INFO_SERIALIZER); - - if (loadData) - logger.debug("Loaded snapshot at " + resourcePath); - - return table; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java deleted file mode 100644 index cc3c637..0000000 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict.lookup; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import org.apache.commons.lang.ArrayUtils; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.dict.StringBytesConverter; -import org.apache.kylin.dict.TrieDictionary; -import org.apache.kylin.dict.TrieDictionaryBuilder; -import org.apache.kylin.metadata.model.TableDesc; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - * @author yangli9 - */ -@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class SnapshotTable extends RootPersistentEntity implements ReadableTable { - - @JsonProperty("signature") - private TableSignature signature; - @JsonProperty("useDictionary") - private boolean useDictionary; - - private List<int[]> rowIndices = Collections.emptyList(); - private Dictionary<String> dict; - - // default constructor for JSON serialization - public SnapshotTable() { - } - - SnapshotTable(ReadableTable table) throws IOException { - this.signature = table.getSignature(); - this.useDictionary = true; - } - - public void takeSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException { - this.signature = table.getSignature(); - - int maxIndex = tableDesc.getMaxColumnIndex(); - - TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter()); - - TableReader reader = table.getReader(); - while (reader.next()) { - String[] row = reader.getRow(); - if (row.length <= maxIndex) { - throw new IllegalStateException("Bad hive table row, " + tableDesc + " expect " + (maxIndex + 1) + " columns, but got " + Arrays.toString(row)); - } - - for (String cell : row) { - if (cell != null) - b.addValue(cell); - } - } - - this.dict = b.build(0); - - reader = table.getReader(); - ArrayList<int[]> allRowIndices = new ArrayList<int[]>(); - while (reader.next()) { - String[] row = reader.getRow(); - int[] rowIndex = new int[row.length]; - for (int i = 0; i < row.length; i++) { - rowIndex[i] = dict.getIdFromValue(row[i]); - } - allRowIndices.add(rowIndex); - } - this.rowIndices = allRowIndices; - } - - public String getResourcePath() { - return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + new Path(signature.getPath()).getName() + "/" + uuid + ".snapshot"; - } - - public String getResourceDir() { - return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + new Path(signature.getPath()).getName(); - } - - @Override - public TableReader getReader() throws IOException { - return new TableReader() { - - int i = -1; - - @Override - public boolean next() throws IOException { - i++; - return i < rowIndices.size(); - } - - @Override - public String[] getRow() { - int[] rowIndex = rowIndices.get(i); - String[] row = new String[rowIndex.length]; - for (int x = 0; x < row.length; x++) { - row[x] = dict.getValueFromId(rowIndex[x]); - } - return row; - } - - @Override - public void close() throws IOException { - } - }; - } - - @Override - public TableSignature getSignature() throws IOException { - return signature; - } - - @Override - public boolean exists() throws IOException { - return true; - } - - /** - * a naive implementation - * - * @return - */ - @Override - public int hashCode() { - int[] parts = new int[this.rowIndices.size()]; - for (int i = 0; i < parts.length; ++i) - parts[i] = Arrays.hashCode(this.rowIndices.get(i)); - return Arrays.hashCode(parts); - } - - @Override - public boolean equals(Object o) { - if ((o instanceof SnapshotTable) == false) - return false; - SnapshotTable that = (SnapshotTable) o; - - //compare row by row - if (this.rowIndices.size() != that.rowIndices.size()) - return false; - for (int i = 0; i < this.rowIndices.size(); ++i) { - if (!ArrayUtils.isEquals(this.rowIndices.get(i), that.rowIndices.get(i))) - return false; - } - return true; - } - - void writeData(DataOutput out) throws IOException { - out.writeInt(rowIndices.size()); - if (rowIndices.size() > 0) { - int n = rowIndices.get(0).length; - out.writeInt(n); - - if (this.useDictionary == true) { - dict.write(out); - for (int i = 0; i < rowIndices.size(); i++) { - int[] row = rowIndices.get(i); - for (int j = 0; j < n; j++) { - out.writeInt(row[j]); - } - } - - } else { - for (int i = 0; i < rowIndices.size(); i++) { - int[] row = rowIndices.get(i); - for (int j = 0; j < n; j++) { - out.writeUTF(dict.getValueFromId(row[j])); - } - } - } - } - } - - void readData(DataInput in) throws IOException { - int rowNum = in.readInt(); - if (rowNum > 0) { - int n = in.readInt(); - rowIndices = new ArrayList<int[]>(rowNum); - - if (this.useDictionary == true) { - this.dict = new TrieDictionary<String>(); - dict.readFields(in); - - for (int i = 0; i < rowNum; i++) { - int[] row = new int[n]; - this.rowIndices.add(row); - for (int j = 0; j < n; j++) { - row[j] = in.readInt(); - } - } - } else { - List<String[]> rows = new ArrayList<String[]>(rowNum); - TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter()); - - for (int i = 0; i < rowNum; i++) { - String[] row = new String[n]; - rows.add(row); - for (int j = 0; j < n; j++) { - row[j] = in.readUTF(); - if (row[j] != null) - b.addValue(row[j]); - } - } - this.dict = b.build(0); - for (String[] row : rows) { - int[] rowIndex = new int[n]; - for (int i = 0; i < n; i++) { - rowIndex[i] = dict.getIdFromValue(row[i]); - } - this.rowIndices.add(rowIndex); - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java ---------------------------------------------------------------------- diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java deleted file mode 100644 index 0152af6..0000000 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict.lookup; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.common.util.JsonUtil; - -/* - * Copyright 2013-2014 eBay Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * @author yangli9 - * - */ -public class SnapshotTableSerializer implements Serializer<SnapshotTable> { - - public static final SnapshotTableSerializer FULL_SERIALIZER = new SnapshotTableSerializer(false); - public static final SnapshotTableSerializer INFO_SERIALIZER = new SnapshotTableSerializer(true); - - private boolean infoOnly; - - SnapshotTableSerializer(boolean infoOnly) { - this.infoOnly = infoOnly; - } - - @Override - public void serialize(SnapshotTable obj, DataOutputStream out) throws IOException { - String json = JsonUtil.writeValueAsIndentString(obj); - out.writeUTF(json); - - if (infoOnly == false) - obj.writeData(out); - } - - @Override - public SnapshotTable deserialize(DataInputStream in) throws IOException { - String json = in.readUTF(); - SnapshotTable obj = JsonUtil.readValue(json, SnapshotTable.class); - - if (infoOnly == false) - obj.readData(in); - - return obj; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/test/java/org/apache/kylin/dict/DateStrDictionaryTest.java ---------------------------------------------------------------------- diff --git a/dictionary/src/test/java/org/apache/kylin/dict/DateStrDictionaryTest.java b/dictionary/src/test/java/org/apache/kylin/dict/DateStrDictionaryTest.java deleted file mode 100644 index 5542422..0000000 --- a/dictionary/src/test/java/org/apache/kylin/dict/DateStrDictionaryTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -import org.junit.Before; -import org.junit.Test; - -public class DateStrDictionaryTest { - - DateStrDictionary dict; - - @Before - public void setup() { - dict = new DateStrDictionary(); - } - - @Test - public void testMinMaxId() { - assertEquals(0, dict.getIdFromValue("0000-01-01")); - assertEquals(DateStrDictionary.ID_9999_12_31, dict.getIdFromValue("9999-12-31")); - - try { - dict.getValueFromId(-2); // -1 is id for NULL - fail("IllegalArgumentException expected"); - } catch (IllegalArgumentException e) { - // good - } - - try { - dict.getValueFromId(DateStrDictionary.ID_9999_12_31 + 1); - fail("IllegalArgumentException expected"); - } catch (IllegalArgumentException e) { - // good - } - - try { - dict.getIdFromValue("10000-1-1"); - fail("IllegalArgumentException expected"); - } catch (IllegalArgumentException e) { - // good - } - } - - @Test - public void testNull() { - int nullId = dict.getIdFromValue(null); - assertNull(dict.getValueFromId(nullId)); - int nullId2 = dict.getIdFromValueBytes(null, 0, 0); - assertEquals(dict.getValueBytesFromId(nullId2, null, 0), -1); - assertEquals(nullId, nullId2); - } - - @Test - public void test() { - checkPair("0001-01-01"); - checkPair("1970-01-02"); - checkPair("1975-06-24"); - checkPair("2024-10-04"); - checkPair("9999-12-31"); - } - - @Test - public void testIllegalArgument() { - try { - dict.getIdFromValue("abcd"); - fail("IllegalArgumentException expected"); - } catch (IllegalArgumentException e) { - // good - } - - try { - dict.getValueFromId(-2); - fail("IllegalArgumentException expected"); - } catch (IllegalArgumentException e) { - // good - } - } - - private void checkPair(String dateStr) { - int id = dict.getIdFromValue(dateStr); - String dateStrBack = dict.getValueFromId(id); - assertEquals(dateStr, dateStrBack); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/dictionary/src/test/java/org/apache/kylin/dict/HiveTableReaderTest.java ---------------------------------------------------------------------- diff --git a/dictionary/src/test/java/org/apache/kylin/dict/HiveTableReaderTest.java b/dictionary/src/test/java/org/apache/kylin/dict/HiveTableReaderTest.java deleted file mode 100644 index 6594f17..0000000 --- a/dictionary/src/test/java/org/apache/kylin/dict/HiveTableReaderTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.dict; - -import java.io.IOException; - -import org.apache.commons.lang.ArrayUtils; -import org.apache.kylin.common.util.HBaseMetadataTestCase; -import org.apache.kylin.dict.lookup.HiveTableReader; -import org.junit.Assert; -import org.junit.Test; - -/** - * This test case need the hive runtime; Please run it with sandbox; It is in the exclude list of default profile in pom.xml - * @author shaoshi - * - */ -public class HiveTableReaderTest extends HBaseMetadataTestCase { - - @Test - public void test() throws IOException { - HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact"); - int rowNumber = 0; - while (reader.next()) { - String[] row = reader.getRow(); - Assert.assertEquals(9, row.length); - System.out.println(ArrayUtils.toString(row)); - rowNumber++; - } - - reader.close(); - Assert.assertEquals(10000, rowNumber); - } -}
