http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/ZeroCopyShims.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/ZeroCopyShims.java b/java/core/src/java/org/apache/orc/impl/ZeroCopyShims.java new file mode 100644 index 0000000..de02c8b --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/ZeroCopyShims.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.io.ByteBufferPool; + +class ZeroCopyShims { + private static final class ByteBufferPoolAdapter implements ByteBufferPool { + private HadoopShims.ByteBufferPoolShim pool; + + public ByteBufferPoolAdapter(HadoopShims.ByteBufferPoolShim pool) { + this.pool = pool; + } + + @Override + public final ByteBuffer getBuffer(boolean direct, int length) { + return this.pool.getBuffer(direct, length); + } + + @Override + public final void putBuffer(ByteBuffer buffer) { + this.pool.putBuffer(buffer); + } + } + + private static final class ZeroCopyAdapter implements HadoopShims.ZeroCopyReaderShim { + private final FSDataInputStream in; + private final ByteBufferPoolAdapter pool; + private final static EnumSet<ReadOption> CHECK_SUM = EnumSet + .noneOf(ReadOption.class); + private final static EnumSet<ReadOption> NO_CHECK_SUM = EnumSet + .of(ReadOption.SKIP_CHECKSUMS); + + public ZeroCopyAdapter(FSDataInputStream in, + HadoopShims.ByteBufferPoolShim poolshim) { + this.in = in; + if (poolshim != null) { + pool = new ByteBufferPoolAdapter(poolshim); + } else { + pool = null; + } + } + + public final ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) + throws IOException { + EnumSet<ReadOption> options = NO_CHECK_SUM; + if (verifyChecksums) { + options = CHECK_SUM; + } + return this.in.read(this.pool, maxLength, options); + } + + public final void releaseBuffer(ByteBuffer buffer) { + this.in.releaseBuffer(buffer); + } + + @Override + public final void close() throws IOException { + this.in.close(); + } + } + + public static HadoopShims.ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, + HadoopShims.ByteBufferPoolShim pool) throws IOException { + return new ZeroCopyAdapter(in, pool); + } + +}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/ZlibCodec.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java new file mode 100644 index 0000000..5f648a8 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; +import java.util.zip.DataFormatException; +import java.util.zip.Deflater; +import java.util.zip.Inflater; + +import javax.annotation.Nullable; + +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.orc.CompressionCodec; + +public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec { + private static final HadoopShims SHIMS = HadoopShims.Factory.get(); + private Boolean direct = null; + + private final int level; + private final int strategy; + + public ZlibCodec() { + level = Deflater.DEFAULT_COMPRESSION; + strategy = Deflater.DEFAULT_STRATEGY; + } + + private ZlibCodec(int level, int strategy) { + this.level = level; + this.strategy = strategy; + } + + @Override + public boolean compress(ByteBuffer in, ByteBuffer out, + ByteBuffer overflow) throws IOException { + Deflater deflater = new Deflater(level, true); + deflater.setStrategy(strategy); + int length = in.remaining(); + deflater.setInput(in.array(), in.arrayOffset() + in.position(), length); + deflater.finish(); + int outSize = 0; + int offset = out.arrayOffset() + out.position(); + while (!deflater.finished() && (length > outSize)) { + int size = deflater.deflate(out.array(), offset, out.remaining()); + out.position(size + out.position()); + outSize += size; + offset += size; + // if we run out of space in the out buffer, use the overflow + if (out.remaining() == 0) { + if (overflow == null) { + deflater.end(); + return false; + } + out = overflow; + offset = out.arrayOffset() + out.position(); + } + } + deflater.end(); + return length > outSize; + } + + @Override + public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { + + if(in.isDirect() && out.isDirect()) { + directDecompress(in, out); + return; + } + + Inflater inflater = new Inflater(true); + inflater.setInput(in.array(), in.arrayOffset() + in.position(), + in.remaining()); + while (!(inflater.finished() || inflater.needsDictionary() || + inflater.needsInput())) { + try { + int count = inflater.inflate(out.array(), + out.arrayOffset() + out.position(), + out.remaining()); + out.position(count + out.position()); + } catch (DataFormatException dfe) { + throw new IOException("Bad compression data", dfe); + } + } + out.flip(); + inflater.end(); + in.position(in.limit()); + } + + @Override + public boolean isAvailable() { + if (direct == null) { + // see nowrap option in new Inflater(boolean) which disables zlib headers + try { + if (SHIMS.getDirectDecompressor( + HadoopShims.DirectCompressionType.ZLIB_NOHEADER) != null) { + direct = Boolean.valueOf(true); + } else { + direct = Boolean.valueOf(false); + } + } catch (UnsatisfiedLinkError ule) { + direct = Boolean.valueOf(false); + } + } + return direct.booleanValue(); + } + + @Override + public void directDecompress(ByteBuffer in, ByteBuffer out) + throws IOException { + HadoopShims.DirectDecompressor decompressShim = + SHIMS.getDirectDecompressor(HadoopShims.DirectCompressionType.ZLIB_NOHEADER); + decompressShim.decompress(in, out); + out.flip(); // flip for read + } + + @Override + public CompressionCodec modify(@Nullable EnumSet<Modifier> modifiers) { + + if (modifiers == null) { + return this; + } + + int l = this.level; + int s = this.strategy; + + for (Modifier m : modifiers) { + switch (m) { + case BINARY: + /* filtered == less LZ77, more huffman */ + s = Deflater.FILTERED; + break; + case TEXT: + s = Deflater.DEFAULT_STRATEGY; + break; + case FASTEST: + // deflate_fast looking for 8 byte patterns + l = Deflater.BEST_SPEED; + break; + case FAST: + // deflate_fast looking for 16 byte patterns + l = Deflater.BEST_SPEED + 1; + break; + case DEFAULT: + // deflate_slow looking for 128 byte patterns + l = Deflater.DEFAULT_COMPRESSION; + break; + default: + break; + } + } + return new ZlibCodec(l, s); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/tools/FileDump.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/tools/FileDump.java b/java/core/src/java/org/apache/orc/tools/FileDump.java new file mode 100644 index 0000000..e32027f --- /dev/null +++ b/java/core/src/java/org/apache/orc/tools/FileDump.java @@ -0,0 +1,934 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.tools; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintStream; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.orc.BloomFilterIO; +import org.apache.orc.ColumnStatistics; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.impl.AcidStats; +import org.apache.orc.impl.ColumnStatisticsImpl; +import org.apache.orc.impl.OrcAcidUtils; +import org.apache.orc.impl.OrcIndex; +import org.apache.orc.OrcProto; +import org.apache.orc.StripeInformation; +import org.apache.orc.StripeStatistics; +import org.apache.orc.impl.RecordReaderImpl; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONWriter; + +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +/** + * A tool for printing out the file structure of ORC files. + */ +public final class FileDump { + public static final String UNKNOWN = "UNKNOWN"; + public static final String SEPARATOR = Strings.repeat("_", 120) + "\n"; + public static final int DEFAULT_BLOCK_SIZE = 256 * 1024 * 1024; + public static final String DEFAULT_BACKUP_PATH = System.getProperty("java.io.tmpdir"); + public static final PathFilter HIDDEN_AND_SIDE_FILE_FILTER = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith(".") && !name.endsWith( + OrcAcidUtils.DELTA_SIDE_FILE_SUFFIX); + } + }; + + // not used + private FileDump() { + } + + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + + List<Integer> rowIndexCols = null; + Options opts = createOptions(); + CommandLine cli = new GnuParser().parse(opts, args); + + if (cli.hasOption('h')) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("orcfiledump", opts); + return; + } + + boolean dumpData = cli.hasOption('d'); + boolean recover = cli.hasOption("recover"); + boolean skipDump = cli.hasOption("skip-dump"); + String backupPath = DEFAULT_BACKUP_PATH; + if (cli.hasOption("backup-path")) { + backupPath = cli.getOptionValue("backup-path"); + } + + if (cli.hasOption("r")) { + String[] colStrs = cli.getOptionValue("r").split(","); + rowIndexCols = new ArrayList<Integer>(colStrs.length); + for (String colStr : colStrs) { + rowIndexCols.add(Integer.parseInt(colStr)); + } + } + + boolean printTimeZone = cli.hasOption('t'); + boolean jsonFormat = cli.hasOption('j'); + String[] files = cli.getArgs(); + if (files.length == 0) { + System.err.println("Error : ORC files are not specified"); + return; + } + + // if the specified path is directory, iterate through all files and print the file dump + List<String> filesInPath = Lists.newArrayList(); + for (String filename : files) { + Path path = new Path(filename); + filesInPath.addAll(getAllFilesInPath(path, conf)); + } + + if (dumpData) { + printData(filesInPath, conf); + } else if (recover && skipDump) { + recoverFiles(filesInPath, conf, backupPath); + } else { + if (jsonFormat) { + boolean prettyPrint = cli.hasOption('p'); + JsonFileDump.printJsonMetaData(filesInPath, conf, rowIndexCols, prettyPrint, printTimeZone); + } else { + printMetaData(filesInPath, conf, rowIndexCols, printTimeZone, recover, backupPath); + } + } + } + + /** + * This method returns an ORC reader object if the specified file is readable. If the specified + * file has side file (_flush_length) file, then max footer offset will be read from the side + * file and orc reader will be created from that offset. Since both data file and side file + * use hflush() for flushing the data, there could be some inconsistencies and both files could be + * out-of-sync. Following are the cases under which null will be returned + * + * 1) If the file specified by path or its side file is still open for writes + * 2) If *_flush_length file does not return any footer offset + * 3) If *_flush_length returns a valid footer offset but the data file is not readable at that + * position (incomplete data file) + * 4) If *_flush_length file length is not a multiple of 8, then reader will be created from + * previous valid footer. If there is no such footer (file length > 0 and < 8), then null will + * be returned + * + * Also, if this method detects any file corruption (mismatch between data file and side file) + * then it will add the corresponding file to the specified input list for corrupted files. + * + * In all other cases, where the file is readable this method will return a reader object. + * + * @param path - file to get reader for + * @param conf - configuration object + * @param corruptFiles - fills this list with all possible corrupted files + * @return - reader for the specified file or null + * @throws IOException + */ + static Reader getReader(final Path path, final Configuration conf, + final List<String> corruptFiles) throws IOException { + FileSystem fs = path.getFileSystem(conf); + long dataFileLen = fs.getFileStatus(path).getLen(); + System.err.println("Processing data file " + path + " [length: " + dataFileLen + "]"); + Path sideFile = OrcAcidUtils.getSideFile(path); + final boolean sideFileExists = fs.exists(sideFile); + boolean openDataFile = false; + boolean openSideFile = false; + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + openDataFile = !dfs.isFileClosed(path); + openSideFile = sideFileExists && !dfs.isFileClosed(sideFile); + } + + if (openDataFile || openSideFile) { + if (openDataFile && openSideFile) { + System.err.println("Unable to perform file dump as " + path + " and " + sideFile + + " are still open for writes."); + } else if (openSideFile) { + System.err.println("Unable to perform file dump as " + sideFile + + " is still open for writes."); + } else { + System.err.println("Unable to perform file dump as " + path + + " is still open for writes."); + } + + return null; + } + + Reader reader = null; + if (sideFileExists) { + final long maxLen = OrcAcidUtils.getLastFlushLength(fs, path); + final long sideFileLen = fs.getFileStatus(sideFile).getLen(); + System.err.println("Found flush length file " + sideFile + + " [length: " + sideFileLen + ", maxFooterOffset: " + maxLen + "]"); + // no offsets read from side file + if (maxLen == -1) { + + // if data file is larger than last flush length, then additional data could be recovered + if (dataFileLen > maxLen) { + System.err.println("Data file has more data than max footer offset:" + maxLen + + ". Adding data file to recovery list."); + if (corruptFiles != null) { + corruptFiles.add(path.toUri().toString()); + } + } + return null; + } + + try { + reader = OrcFile.createReader(path, OrcFile.readerOptions(conf).maxLength(maxLen)); + + // if data file is larger than last flush length, then additional data could be recovered + if (dataFileLen > maxLen) { + System.err.println("Data file has more data than max footer offset:" + maxLen + + ". Adding data file to recovery list."); + if (corruptFiles != null) { + corruptFiles.add(path.toUri().toString()); + } + } + } catch (Exception e) { + if (corruptFiles != null) { + corruptFiles.add(path.toUri().toString()); + } + System.err.println("Unable to read data from max footer offset." + + " Adding data file to recovery list."); + return null; + } + } else { + reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + } + + return reader; + } + + public static Collection<String> getAllFilesInPath(final Path path, + final Configuration conf) throws IOException { + List<String> filesInPath = Lists.newArrayList(); + FileSystem fs = path.getFileSystem(conf); + FileStatus fileStatus = fs.getFileStatus(path); + if (fileStatus.isDir()) { + FileStatus[] fileStatuses = fs.listStatus(path, HIDDEN_AND_SIDE_FILE_FILTER); + for (FileStatus fileInPath : fileStatuses) { + if (fileInPath.isDir()) { + filesInPath.addAll(getAllFilesInPath(fileInPath.getPath(), conf)); + } else { + filesInPath.add(fileInPath.getPath().toString()); + } + } + } else { + filesInPath.add(path.toString()); + } + + return filesInPath; + } + + private static void printData(List<String> files, + Configuration conf) throws IOException, + JSONException { + for (String file : files) { + try { + Path path = new Path(file); + Reader reader = getReader(path, conf, Lists.<String>newArrayList()); + if (reader == null) { + continue; + } + printJsonData(reader); + System.out.println(SEPARATOR); + } catch (Exception e) { + System.err.println("Unable to dump data for file: " + file); + continue; + } + } + } + + private static void printMetaData(List<String> files, Configuration conf, + List<Integer> rowIndexCols, boolean printTimeZone, final boolean recover, + final String backupPath) + throws IOException { + List<String> corruptFiles = Lists.newArrayList(); + for (String filename : files) { + printMetaDataImpl(filename, conf, rowIndexCols, printTimeZone, corruptFiles); + System.out.println(SEPARATOR); + } + + if (!corruptFiles.isEmpty()) { + if (recover) { + recoverFiles(corruptFiles, conf, backupPath); + } else { + System.err.println(corruptFiles.size() + " file(s) are corrupted." + + " Run the following command to recover corrupted files.\n"); + String fileNames = Joiner.on(" ").skipNulls().join(corruptFiles); + System.err.println("hive --orcfiledump --recover --skip-dump " + fileNames); + System.out.println(SEPARATOR); + } + } + } + + private static void printMetaDataImpl(final String filename, + final Configuration conf, final List<Integer> rowIndexCols, final boolean printTimeZone, + final List<String> corruptFiles) throws IOException { + Path file = new Path(filename); + Reader reader = getReader(file, conf, corruptFiles); + // if we can create reader then footer is not corrupt and file will readable + if (reader == null) { + return; + } + + System.out.println("Structure for " + filename); + System.out.println("File Version: " + reader.getFileVersion().getName() + + " with " + reader.getWriterVersion()); + RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); + System.out.println("Rows: " + reader.getNumberOfRows()); + System.out.println("Compression: " + reader.getCompressionKind()); + if (reader.getCompressionKind() != CompressionKind.NONE) { + System.out.println("Compression size: " + reader.getCompressionSize()); + } + System.out.println("Type: " + reader.getSchema().toString()); + System.out.println("\nStripe Statistics:"); + List<StripeStatistics> stripeStats = reader.getStripeStatistics(); + for (int n = 0; n < stripeStats.size(); n++) { + System.out.println(" Stripe " + (n + 1) + ":"); + StripeStatistics ss = stripeStats.get(n); + for (int i = 0; i < ss.getColumnStatistics().length; ++i) { + System.out.println(" Column " + i + ": " + + ss.getColumnStatistics()[i].toString()); + } + } + ColumnStatistics[] stats = reader.getStatistics(); + int colCount = stats.length; + System.out.println("\nFile Statistics:"); + for (int i = 0; i < stats.length; ++i) { + System.out.println(" Column " + i + ": " + stats[i].toString()); + } + System.out.println("\nStripes:"); + int stripeIx = -1; + for (StripeInformation stripe : reader.getStripes()) { + ++stripeIx; + long stripeStart = stripe.getOffset(); + OrcProto.StripeFooter footer = rows.readStripeFooter(stripe); + if (printTimeZone) { + String tz = footer.getWriterTimezone(); + if (tz == null || tz.isEmpty()) { + tz = UNKNOWN; + } + System.out.println(" Stripe: " + stripe.toString() + " timezone: " + tz); + } else { + System.out.println(" Stripe: " + stripe.toString()); + } + long sectionStart = stripeStart; + for (OrcProto.Stream section : footer.getStreamsList()) { + String kind = section.hasKind() ? section.getKind().name() : UNKNOWN; + System.out.println(" Stream: column " + section.getColumn() + + " section " + kind + " start: " + sectionStart + + " length " + section.getLength()); + sectionStart += section.getLength(); + } + for (int i = 0; i < footer.getColumnsCount(); ++i) { + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + StringBuilder buf = new StringBuilder(); + buf.append(" Encoding column "); + buf.append(i); + buf.append(": "); + buf.append(encoding.getKind()); + if (encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY || + encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { + buf.append("["); + buf.append(encoding.getDictionarySize()); + buf.append("]"); + } + System.out.println(buf); + } + if (rowIndexCols != null && !rowIndexCols.isEmpty()) { + // include the columns that are specified, only if the columns are included, bloom filter + // will be read + boolean[] sargColumns = new boolean[colCount]; + for (int colIdx : rowIndexCols) { + sargColumns[colIdx] = true; + } + OrcIndex indices = rows + .readRowIndex(stripeIx, null, null, null, sargColumns); + for (int col : rowIndexCols) { + StringBuilder buf = new StringBuilder(); + String rowIdxString = getFormattedRowIndices(col, indices.getRowGroupIndex()); + buf.append(rowIdxString); + String bloomFilString = getFormattedBloomFilters(col, indices.getBloomFilterIndex()); + buf.append(bloomFilString); + System.out.println(buf); + } + } + } + + FileSystem fs = file.getFileSystem(conf); + long fileLen = fs.getFileStatus(file).getLen(); + long paddedBytes = getTotalPaddingSize(reader); + // empty ORC file is ~45 bytes. Assumption here is file length always >0 + double percentPadding = ((double) paddedBytes / (double) fileLen) * 100; + DecimalFormat format = new DecimalFormat("##.##"); + System.out.println("\nFile length: " + fileLen + " bytes"); + System.out.println("Padding length: " + paddedBytes + " bytes"); + System.out.println("Padding ratio: " + format.format(percentPadding) + "%"); + AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader); + if (acidStats != null) { + System.out.println("ACID stats:" + acidStats); + } + rows.close(); + } + + private static void recoverFiles(final List<String> corruptFiles, final Configuration conf, + final String backup) + throws IOException { + for (String corruptFile : corruptFiles) { + System.err.println("Recovering file " + corruptFile); + Path corruptPath = new Path(corruptFile); + FileSystem fs = corruptPath.getFileSystem(conf); + FSDataInputStream fdis = fs.open(corruptPath); + try { + long corruptFileLen = fs.getFileStatus(corruptPath).getLen(); + long remaining = corruptFileLen; + List<Long> footerOffsets = Lists.newArrayList(); + + // start reading the data file form top to bottom and record the valid footers + while (remaining > 0) { + int toRead = (int) Math.min(DEFAULT_BLOCK_SIZE, remaining); + byte[] data = new byte[toRead]; + long startPos = corruptFileLen - remaining; + fdis.readFully(startPos, data, 0, toRead); + + // find all MAGIC string and see if the file is readable from there + int index = 0; + long nextFooterOffset; + + while (index != -1) { + index = indexOf(data, OrcFile.MAGIC.getBytes(), index + 1); + if (index != -1) { + nextFooterOffset = startPos + index + OrcFile.MAGIC.length() + 1; + if (isReadable(corruptPath, conf, nextFooterOffset)) { + footerOffsets.add(nextFooterOffset); + } + } + } + + System.err.println("Scanning for valid footers - startPos: " + startPos + + " toRead: " + toRead + " remaining: " + remaining); + remaining = remaining - toRead; + } + + System.err.println("Readable footerOffsets: " + footerOffsets); + recoverFile(corruptPath, fs, conf, footerOffsets, backup); + } catch (Exception e) { + Path recoveryFile = getRecoveryFile(corruptPath); + if (fs.exists(recoveryFile)) { + fs.delete(recoveryFile, false); + } + System.err.println("Unable to recover file " + corruptFile); + e.printStackTrace(); + System.err.println(SEPARATOR); + continue; + } finally { + fdis.close(); + } + System.err.println(corruptFile + " recovered successfully!"); + System.err.println(SEPARATOR); + } + } + + private static void recoverFile(final Path corruptPath, final FileSystem fs, + final Configuration conf, final List<Long> footerOffsets, final String backup) + throws IOException { + + // first recover the file to .recovered file and then once successful rename it to actual file + Path recoveredPath = getRecoveryFile(corruptPath); + + // make sure that file does not exist + if (fs.exists(recoveredPath)) { + fs.delete(recoveredPath, false); + } + + // if there are no valid footers, the file should still be readable so create an empty orc file + if (footerOffsets == null || footerOffsets.isEmpty()) { + System.err.println("No readable footers found. Creating empty orc file."); + TypeDescription schema = TypeDescription.createStruct(); + Writer writer = OrcFile.createWriter(recoveredPath, + OrcFile.writerOptions(conf).setSchema(schema)); + writer.close(); + } else { + FSDataInputStream fdis = fs.open(corruptPath); + FileStatus fileStatus = fs.getFileStatus(corruptPath); + // read corrupt file and copy it to recovered file until last valid footer + FSDataOutputStream fdos = fs.create(recoveredPath, true, + conf.getInt("io.file.buffer.size", 4096), + fileStatus.getReplication(), + fileStatus.getBlockSize()); + try { + long fileLen = footerOffsets.get(footerOffsets.size() - 1); + long remaining = fileLen; + + while (remaining > 0) { + int toRead = (int) Math.min(DEFAULT_BLOCK_SIZE, remaining); + byte[] data = new byte[toRead]; + long startPos = fileLen - remaining; + fdis.readFully(startPos, data, 0, toRead); + fdos.write(data); + System.err.println("Copying data to recovery file - startPos: " + startPos + + " toRead: " + toRead + " remaining: " + remaining); + remaining = remaining - toRead; + } + } catch (Exception e) { + fs.delete(recoveredPath, false); + throw new IOException(e); + } finally { + fdis.close(); + fdos.close(); + } + } + + // validate the recovered file once again and start moving corrupt files to backup folder + if (isReadable(recoveredPath, conf, Long.MAX_VALUE)) { + Path backupDataPath; + String scheme = corruptPath.toUri().getScheme(); + String authority = corruptPath.toUri().getAuthority(); + String filePath = corruptPath.toUri().getPath(); + + // use the same filesystem as corrupt file if backup-path is not explicitly specified + if (backup.equals(DEFAULT_BACKUP_PATH)) { + backupDataPath = new Path(scheme, authority, DEFAULT_BACKUP_PATH + filePath); + } else { + backupDataPath = Path.mergePaths(new Path(backup), corruptPath); + } + + // Move data file to backup path + moveFiles(fs, corruptPath, backupDataPath); + + // Move side file to backup path + Path sideFilePath = OrcAcidUtils.getSideFile(corruptPath); + Path backupSideFilePath = new Path(backupDataPath.getParent(), sideFilePath.getName()); + moveFiles(fs, sideFilePath, backupSideFilePath); + + // finally move recovered file to actual file + moveFiles(fs, recoveredPath, corruptPath); + + // we are done recovering, backing up and validating + System.err.println("Validation of recovered file successful!"); + } + } + + private static void moveFiles(final FileSystem fs, final Path src, final Path dest) + throws IOException { + try { + // create the dest directory if not exist + if (!fs.exists(dest.getParent())) { + fs.mkdirs(dest.getParent()); + } + + // if the destination file exists for some reason delete it + fs.delete(dest, false); + + if (fs.rename(src, dest)) { + System.err.println("Moved " + src + " to " + dest); + } else { + throw new IOException("Unable to move " + src + " to " + dest); + } + + } catch (Exception e) { + throw new IOException("Unable to move " + src + " to " + dest, e); + } + } + + private static Path getRecoveryFile(final Path corruptPath) { + return new Path(corruptPath.getParent(), corruptPath.getName() + ".recovered"); + } + + private static boolean isReadable(final Path corruptPath, final Configuration conf, + final long maxLen) { + try { + OrcFile.createReader(corruptPath, OrcFile.readerOptions(conf).maxLength(maxLen)); + return true; + } catch (Exception e) { + // ignore this exception as maxLen is unreadable + return false; + } + } + + // search for byte pattern in another byte array + private static int indexOf(final byte[] data, final byte[] pattern, final int index) { + if (data == null || data.length == 0 || pattern == null || pattern.length == 0 || + index > data.length || index < 0) { + return -1; + } + + int j = 0; + for (int i = index; i < data.length; i++) { + if (pattern[j] == data[i]) { + j++; + } else { + j = 0; + } + + if (j == pattern.length) { + return i - pattern.length + 1; + } + } + + return -1; + } + + private static String getFormattedBloomFilters(int col, + OrcProto.BloomFilterIndex[] bloomFilterIndex) { + StringBuilder buf = new StringBuilder(); + BloomFilterIO stripeLevelBF = null; + if (bloomFilterIndex != null && bloomFilterIndex[col] != null) { + int idx = 0; + buf.append("\n Bloom filters for column ").append(col).append(":"); + for (OrcProto.BloomFilter bf : bloomFilterIndex[col].getBloomFilterList()) { + BloomFilterIO toMerge = new BloomFilterIO(bf); + buf.append("\n Entry ").append(idx++).append(":").append(getBloomFilterStats(toMerge)); + if (stripeLevelBF == null) { + stripeLevelBF = toMerge; + } else { + stripeLevelBF.merge(toMerge); + } + } + String bloomFilterStats = getBloomFilterStats(stripeLevelBF); + buf.append("\n Stripe level merge:").append(bloomFilterStats); + } + return buf.toString(); + } + + private static String getBloomFilterStats(BloomFilterIO bf) { + StringBuilder sb = new StringBuilder(); + int bitCount = bf.getBitSize(); + int popCount = 0; + for (long l : bf.getBitSet()) { + popCount += Long.bitCount(l); + } + int k = bf.getNumHashFunctions(); + float loadFactor = (float) popCount / (float) bitCount; + float expectedFpp = (float) Math.pow(loadFactor, k); + DecimalFormat df = new DecimalFormat("###.####"); + sb.append(" numHashFunctions: ").append(k); + sb.append(" bitCount: ").append(bitCount); + sb.append(" popCount: ").append(popCount); + sb.append(" loadFactor: ").append(df.format(loadFactor)); + sb.append(" expectedFpp: ").append(expectedFpp); + return sb.toString(); + } + + private static String getFormattedRowIndices(int col, + OrcProto.RowIndex[] rowGroupIndex) { + StringBuilder buf = new StringBuilder(); + OrcProto.RowIndex index; + buf.append(" Row group indices for column ").append(col).append(":"); + if (rowGroupIndex == null || (col >= rowGroupIndex.length) || + ((index = rowGroupIndex[col]) == null)) { + buf.append(" not found\n"); + return buf.toString(); + } + + for (int entryIx = 0; entryIx < index.getEntryCount(); ++entryIx) { + buf.append("\n Entry ").append(entryIx).append(": "); + OrcProto.RowIndexEntry entry = index.getEntry(entryIx); + if (entry == null) { + buf.append("unknown\n"); + continue; + } + OrcProto.ColumnStatistics colStats = entry.getStatistics(); + if (colStats == null) { + buf.append("no stats at "); + } else { + ColumnStatistics cs = ColumnStatisticsImpl.deserialize(colStats); + buf.append(cs.toString()); + } + buf.append(" positions: "); + for (int posIx = 0; posIx < entry.getPositionsCount(); ++posIx) { + if (posIx != 0) { + buf.append(","); + } + buf.append(entry.getPositions(posIx)); + } + } + return buf.toString(); + } + + public static long getTotalPaddingSize(Reader reader) throws IOException { + long paddedBytes = 0; + List<StripeInformation> stripes = reader.getStripes(); + for (int i = 1; i < stripes.size(); i++) { + long prevStripeOffset = stripes.get(i - 1).getOffset(); + long prevStripeLen = stripes.get(i - 1).getLength(); + paddedBytes += stripes.get(i).getOffset() - (prevStripeOffset + prevStripeLen); + } + return paddedBytes; + } + + static Options createOptions() { + Options result = new Options(); + + // add -d and --data to print the rows + result.addOption(OptionBuilder + .withLongOpt("data") + .withDescription("Should the data be printed") + .create('d')); + + // to avoid breaking unit tests (when run in different time zones) for file dump, printing + // of timezone is made optional + result.addOption(OptionBuilder + .withLongOpt("timezone") + .withDescription("Print writer's time zone") + .create('t')); + + result.addOption(OptionBuilder + .withLongOpt("help") + .withDescription("print help message") + .create('h')); + + result.addOption(OptionBuilder + .withLongOpt("rowindex") + .withArgName("comma separated list of column ids for which row index should be printed") + .withDescription("Dump stats for column number(s)") + .hasArg() + .create('r')); + + result.addOption(OptionBuilder + .withLongOpt("json") + .withDescription("Print metadata in JSON format") + .create('j')); + + result.addOption(OptionBuilder + .withLongOpt("pretty") + .withDescription("Pretty print json metadata output") + .create('p')); + + result.addOption(OptionBuilder + .withLongOpt("recover") + .withDescription("recover corrupted orc files generated by streaming") + .create()); + + result.addOption(OptionBuilder + .withLongOpt("skip-dump") + .withDescription("used along with --recover to directly recover files without dumping") + .create()); + + result.addOption(OptionBuilder + .withLongOpt("backup-path") + .withDescription("specify a backup path to store the corrupted files (default: /tmp)") + .hasArg() + .create()); + return result; + } + + private static void printMap(JSONWriter writer, + MapColumnVector vector, + TypeDescription schema, + int row) throws JSONException { + writer.array(); + TypeDescription keyType = schema.getChildren().get(0); + TypeDescription valueType = schema.getChildren().get(1); + int offset = (int) vector.offsets[row]; + for (int i = 0; i < vector.lengths[row]; ++i) { + writer.object(); + writer.key("_key"); + printValue(writer, vector.keys, keyType, offset + i); + writer.key("_value"); + printValue(writer, vector.values, valueType, offset + i); + writer.endObject(); + } + writer.endArray(); + } + + private static void printList(JSONWriter writer, + ListColumnVector vector, + TypeDescription schema, + int row) throws JSONException { + writer.array(); + int offset = (int) vector.offsets[row]; + TypeDescription childType = schema.getChildren().get(0); + for (int i = 0; i < vector.lengths[row]; ++i) { + printValue(writer, vector.child, childType, offset + i); + } + writer.endArray(); + } + + private static void printUnion(JSONWriter writer, + UnionColumnVector vector, + TypeDescription schema, + int row) throws JSONException { + int tag = vector.tags[row]; + printValue(writer, vector.fields[tag], schema.getChildren().get(tag), row); + } + + static void printStruct(JSONWriter writer, + StructColumnVector batch, + TypeDescription schema, + int row) throws JSONException { + writer.object(); + List<String> fieldNames = schema.getFieldNames(); + List<TypeDescription> fieldTypes = schema.getChildren(); + for (int i = 0; i < fieldTypes.size(); ++i) { + writer.key(fieldNames.get(i)); + printValue(writer, batch.fields[i], fieldTypes.get(i), row); + } + writer.endObject(); + } + + static void printBinary(JSONWriter writer, BytesColumnVector vector, + int row) throws JSONException { + writer.array(); + int offset = vector.start[row]; + for(int i=0; i < vector.length[row]; ++i) { + writer.value(0xff & (int) vector.vector[row][offset + i]); + } + writer.endArray(); + } + static void printValue(JSONWriter writer, ColumnVector vector, + TypeDescription schema, int row) throws JSONException { + if (vector.isRepeating) { + row = 0; + } + if (vector.noNulls || !vector.isNull[row]) { + switch (schema.getCategory()) { + case BOOLEAN: + writer.value(((LongColumnVector) vector).vector[row] != 0); + break; + case BYTE: + case SHORT: + case INT: + case LONG: + writer.value(((LongColumnVector) vector).vector[row]); + break; + case FLOAT: + case DOUBLE: + writer.value(((DoubleColumnVector) vector).vector[row]); + break; + case STRING: + case CHAR: + case VARCHAR: + writer.value(((BytesColumnVector) vector).toString(row)); + break; + case BINARY: + printBinary(writer, (BytesColumnVector) vector, row); + break; + case DECIMAL: + writer.value(((DecimalColumnVector) vector).vector[row].toString()); + break; + case DATE: + writer.value(new DateWritable( + (int) ((LongColumnVector) vector).vector[row]).toString()); + break; + case TIMESTAMP: + writer.value(((TimestampColumnVector) vector) + .asScratchTimestamp(row).toString()); + break; + case LIST: + printList(writer, (ListColumnVector) vector, schema, row); + break; + case MAP: + printMap(writer, (MapColumnVector) vector, schema, row); + break; + case STRUCT: + printStruct(writer, (StructColumnVector) vector, schema, row); + break; + case UNION: + printUnion(writer, (UnionColumnVector) vector, schema, row); + break; + default: + throw new IllegalArgumentException("Unknown type " + + schema.toString()); + } + } else { + writer.value(null); + } + } + + static void printRow(JSONWriter writer, + VectorizedRowBatch batch, + TypeDescription schema, + int row) throws JSONException { + if (schema.getCategory() == TypeDescription.Category.STRUCT) { + List<TypeDescription> fieldTypes = schema.getChildren(); + List<String> fieldNames = schema.getFieldNames(); + writer.object(); + for (int c = 0; c < batch.cols.length; ++c) { + writer.key(fieldNames.get(c)); + printValue(writer, batch.cols[c], fieldTypes.get(c), row); + } + writer.endObject(); + } else { + printValue(writer, batch.cols[0], schema, row); + } + } + + static void printJsonData(final Reader reader) throws IOException, JSONException { + PrintStream printStream = System.out; + OutputStreamWriter out = new OutputStreamWriter(printStream, "UTF-8"); + RecordReader rows = reader.rows(); + try { + TypeDescription schema = reader.getSchema(); + VectorizedRowBatch batch = schema.createRowBatch(); + while (rows.nextBatch(batch)) { + for(int r=0; r < batch.size; ++r) { + JSONWriter writer = new JSONWriter(out); + printRow(writer, batch, schema, r); + out.write("\n"); + out.flush(); + if (printStream.checkError()) { + throw new IOException("Error encountered when writing to stdout."); + } + } + } + } finally { + rows.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/tools/JsonFileDump.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/tools/JsonFileDump.java b/java/core/src/java/org/apache/orc/tools/JsonFileDump.java new file mode 100644 index 0000000..75153a2 --- /dev/null +++ b/java/core/src/java/org/apache/orc/tools/JsonFileDump.java @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.tools; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionKind; +import org.apache.orc.Reader; +import org.apache.orc.impl.AcidStats; +import org.apache.orc.impl.OrcAcidUtils; +import org.apache.orc.impl.RecordReaderImpl; +import org.codehaus.jettison.json.JSONArray; +import org.apache.orc.BloomFilterIO; +import org.apache.orc.BinaryColumnStatistics; +import org.apache.orc.BooleanColumnStatistics; +import org.apache.orc.ColumnStatistics; +import org.apache.orc.impl.ColumnStatisticsImpl; +import org.apache.orc.DateColumnStatistics; +import org.apache.orc.DecimalColumnStatistics; +import org.apache.orc.DoubleColumnStatistics; +import org.apache.orc.IntegerColumnStatistics; +import org.apache.orc.impl.OrcIndex; +import org.apache.orc.OrcProto; +import org.apache.orc.StringColumnStatistics; +import org.apache.orc.StripeInformation; +import org.apache.orc.StripeStatistics; +import org.apache.orc.TimestampColumnStatistics; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.codehaus.jettison.json.JSONStringer; +import org.codehaus.jettison.json.JSONWriter; + +/** + * File dump tool with json formatted output. + */ +public class JsonFileDump { + + public static void printJsonMetaData(List<String> files, + Configuration conf, + List<Integer> rowIndexCols, boolean prettyPrint, boolean printTimeZone) + throws JSONException, IOException { + if (files.isEmpty()) { + return; + } + JSONStringer writer = new JSONStringer(); + boolean multiFile = files.size() > 1; + if (multiFile) { + writer.array(); + } else { + writer.object(); + } + for (String filename : files) { + try { + if (multiFile) { + writer.object(); + } + writer.key("fileName").value(filename); + Path path = new Path(filename); + Reader reader = FileDump.getReader(path, conf, null); + if (reader == null) { + writer.key("status").value("FAILED"); + continue; + } + writer.key("fileVersion").value(reader.getFileVersion().getName()); + writer.key("writerVersion").value(reader.getWriterVersion()); + RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); + writer.key("numberOfRows").value(reader.getNumberOfRows()); + writer.key("compression").value(reader.getCompressionKind()); + if (reader.getCompressionKind() != CompressionKind.NONE) { + writer.key("compressionBufferSize").value(reader.getCompressionSize()); + } + writer.key("schemaString").value(reader.getSchema().toString()); + writer.key("schema").array(); + writeSchema(writer, reader.getTypes()); + writer.endArray(); + + writer.key("stripeStatistics").array(); + List<StripeStatistics> stripeStatistics = reader.getStripeStatistics(); + for (int n = 0; n < stripeStatistics.size(); n++) { + writer.object(); + writer.key("stripeNumber").value(n + 1); + StripeStatistics ss = stripeStatistics.get(n); + writer.key("columnStatistics").array(); + for (int i = 0; i < ss.getColumnStatistics().length; i++) { + writer.object(); + writer.key("columnId").value(i); + writeColumnStatistics(writer, ss.getColumnStatistics()[i]); + writer.endObject(); + } + writer.endArray(); + writer.endObject(); + } + writer.endArray(); + + ColumnStatistics[] stats = reader.getStatistics(); + int colCount = stats.length; + writer.key("fileStatistics").array(); + for (int i = 0; i < stats.length; ++i) { + writer.object(); + writer.key("columnId").value(i); + writeColumnStatistics(writer, stats[i]); + writer.endObject(); + } + writer.endArray(); + + writer.key("stripes").array(); + int stripeIx = -1; + for (StripeInformation stripe : reader.getStripes()) { + ++stripeIx; + long stripeStart = stripe.getOffset(); + OrcProto.StripeFooter footer = rows.readStripeFooter(stripe); + writer.object(); // start of stripe information + writer.key("stripeNumber").value(stripeIx + 1); + writer.key("stripeInformation"); + writeStripeInformation(writer, stripe); + if (printTimeZone) { + writer.key("writerTimezone").value( + footer.hasWriterTimezone() ? footer.getWriterTimezone() : FileDump.UNKNOWN); + } + long sectionStart = stripeStart; + + writer.key("streams").array(); + for (OrcProto.Stream section : footer.getStreamsList()) { + writer.object(); + String kind = section.hasKind() ? section.getKind().name() : FileDump.UNKNOWN; + writer.key("columnId").value(section.getColumn()); + writer.key("section").value(kind); + writer.key("startOffset").value(sectionStart); + writer.key("length").value(section.getLength()); + sectionStart += section.getLength(); + writer.endObject(); + } + writer.endArray(); + + writer.key("encodings").array(); + for (int i = 0; i < footer.getColumnsCount(); ++i) { + writer.object(); + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + writer.key("columnId").value(i); + writer.key("kind").value(encoding.getKind()); + if (encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY || + encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { + writer.key("dictionarySize").value(encoding.getDictionarySize()); + } + writer.endObject(); + } + writer.endArray(); + + if (rowIndexCols != null && !rowIndexCols.isEmpty()) { + // include the columns that are specified, only if the columns are included, bloom filter + // will be read + boolean[] sargColumns = new boolean[colCount]; + for (int colIdx : rowIndexCols) { + sargColumns[colIdx] = true; + } + OrcIndex indices = rows.readRowIndex(stripeIx, null, sargColumns); + writer.key("indexes").array(); + for (int col : rowIndexCols) { + writer.object(); + writer.key("columnId").value(col); + writeRowGroupIndexes(writer, col, indices.getRowGroupIndex()); + writeBloomFilterIndexes(writer, col, indices.getBloomFilterIndex()); + writer.endObject(); + } + writer.endArray(); + } + writer.endObject(); // end of stripe information + } + writer.endArray(); + + FileSystem fs = path.getFileSystem(conf); + long fileLen = fs.getContentSummary(path).getLength(); + long paddedBytes = FileDump.getTotalPaddingSize(reader); + // empty ORC file is ~45 bytes. Assumption here is file length always >0 + double percentPadding = ((double) paddedBytes / (double) fileLen) * 100; + writer.key("fileLength").value(fileLen); + writer.key("paddingLength").value(paddedBytes); + writer.key("paddingRatio").value(percentPadding); + AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader); + if (acidStats != null) { + writer.key("numInserts").value(acidStats.inserts); + writer.key("numDeletes").value(acidStats.deletes); + writer.key("numUpdates").value(acidStats.updates); + } + writer.key("status").value("OK"); + rows.close(); + + writer.endObject(); + } catch (Exception e) { + writer.key("status").value("FAILED"); + throw e; + } + } + if (multiFile) { + writer.endArray(); + } + + if (prettyPrint) { + final String prettyJson; + if (multiFile) { + JSONArray jsonArray = new JSONArray(writer.toString()); + prettyJson = jsonArray.toString(2); + } else { + JSONObject jsonObject = new JSONObject(writer.toString()); + prettyJson = jsonObject.toString(2); + } + System.out.println(prettyJson); + } else { + System.out.println(writer.toString()); + } + } + + private static void writeSchema(JSONStringer writer, List<OrcProto.Type> types) + throws JSONException { + int i = 0; + for(OrcProto.Type type : types) { + writer.object(); + writer.key("columnId").value(i++); + writer.key("columnType").value(type.getKind()); + if (type.getFieldNamesCount() > 0) { + writer.key("childColumnNames").array(); + for (String field : type.getFieldNamesList()) { + writer.value(field); + } + writer.endArray(); + writer.key("childColumnIds").array(); + for (Integer colId : type.getSubtypesList()) { + writer.value(colId); + } + writer.endArray(); + } + if (type.hasPrecision()) { + writer.key("precision").value(type.getPrecision()); + } + + if (type.hasScale()) { + writer.key("scale").value(type.getScale()); + } + + if (type.hasMaximumLength()) { + writer.key("maxLength").value(type.getMaximumLength()); + } + writer.endObject(); + } + } + + private static void writeStripeInformation(JSONWriter writer, StripeInformation stripe) + throws JSONException { + writer.object(); + writer.key("offset").value(stripe.getOffset()); + writer.key("indexLength").value(stripe.getIndexLength()); + writer.key("dataLength").value(stripe.getDataLength()); + writer.key("footerLength").value(stripe.getFooterLength()); + writer.key("rowCount").value(stripe.getNumberOfRows()); + writer.endObject(); + } + + private static void writeColumnStatistics(JSONWriter writer, ColumnStatistics cs) + throws JSONException { + if (cs != null) { + writer.key("count").value(cs.getNumberOfValues()); + writer.key("hasNull").value(cs.hasNull()); + if (cs instanceof BinaryColumnStatistics) { + writer.key("totalLength").value(((BinaryColumnStatistics) cs).getSum()); + writer.key("type").value(OrcProto.Type.Kind.BINARY); + } else if (cs instanceof BooleanColumnStatistics) { + writer.key("trueCount").value(((BooleanColumnStatistics) cs).getTrueCount()); + writer.key("falseCount").value(((BooleanColumnStatistics) cs).getFalseCount()); + writer.key("type").value(OrcProto.Type.Kind.BOOLEAN); + } else if (cs instanceof IntegerColumnStatistics) { + writer.key("min").value(((IntegerColumnStatistics) cs).getMinimum()); + writer.key("max").value(((IntegerColumnStatistics) cs).getMaximum()); + if (((IntegerColumnStatistics) cs).isSumDefined()) { + writer.key("sum").value(((IntegerColumnStatistics) cs).getSum()); + } + writer.key("type").value(OrcProto.Type.Kind.LONG); + } else if (cs instanceof DoubleColumnStatistics) { + writer.key("min").value(((DoubleColumnStatistics) cs).getMinimum()); + writer.key("max").value(((DoubleColumnStatistics) cs).getMaximum()); + writer.key("sum").value(((DoubleColumnStatistics) cs).getSum()); + writer.key("type").value(OrcProto.Type.Kind.DOUBLE); + } else if (cs instanceof StringColumnStatistics) { + writer.key("min").value(((StringColumnStatistics) cs).getMinimum()); + writer.key("max").value(((StringColumnStatistics) cs).getMaximum()); + writer.key("totalLength").value(((StringColumnStatistics) cs).getSum()); + writer.key("type").value(OrcProto.Type.Kind.STRING); + } else if (cs instanceof DateColumnStatistics) { + if (((DateColumnStatistics) cs).getMaximum() != null) { + writer.key("min").value(((DateColumnStatistics) cs).getMinimum()); + writer.key("max").value(((DateColumnStatistics) cs).getMaximum()); + } + writer.key("type").value(OrcProto.Type.Kind.DATE); + } else if (cs instanceof TimestampColumnStatistics) { + if (((TimestampColumnStatistics) cs).getMaximum() != null) { + writer.key("min").value(((TimestampColumnStatistics) cs).getMinimum()); + writer.key("max").value(((TimestampColumnStatistics) cs).getMaximum()); + } + writer.key("type").value(OrcProto.Type.Kind.TIMESTAMP); + } else if (cs instanceof DecimalColumnStatistics) { + if (((DecimalColumnStatistics) cs).getMaximum() != null) { + writer.key("min").value(((DecimalColumnStatistics) cs).getMinimum()); + writer.key("max").value(((DecimalColumnStatistics) cs).getMaximum()); + writer.key("sum").value(((DecimalColumnStatistics) cs).getSum()); + } + writer.key("type").value(OrcProto.Type.Kind.DECIMAL); + } + } + } + + private static void writeBloomFilterIndexes(JSONWriter writer, int col, + OrcProto.BloomFilterIndex[] bloomFilterIndex) throws JSONException { + + BloomFilterIO stripeLevelBF = null; + if (bloomFilterIndex != null && bloomFilterIndex[col] != null) { + int entryIx = 0; + writer.key("bloomFilterIndexes").array(); + for (OrcProto.BloomFilter bf : bloomFilterIndex[col].getBloomFilterList()) { + writer.object(); + writer.key("entryId").value(entryIx++); + BloomFilterIO toMerge = new BloomFilterIO(bf); + writeBloomFilterStats(writer, toMerge); + if (stripeLevelBF == null) { + stripeLevelBF = toMerge; + } else { + stripeLevelBF.merge(toMerge); + } + writer.endObject(); + } + writer.endArray(); + } + if (stripeLevelBF != null) { + writer.key("stripeLevelBloomFilter"); + writer.object(); + writeBloomFilterStats(writer, stripeLevelBF); + writer.endObject(); + } + } + + private static void writeBloomFilterStats(JSONWriter writer, BloomFilterIO bf) + throws JSONException { + int bitCount = bf.getBitSize(); + int popCount = 0; + for (long l : bf.getBitSet()) { + popCount += Long.bitCount(l); + } + int k = bf.getNumHashFunctions(); + float loadFactor = (float) popCount / (float) bitCount; + float expectedFpp = (float) Math.pow(loadFactor, k); + writer.key("numHashFunctions").value(k); + writer.key("bitCount").value(bitCount); + writer.key("popCount").value(popCount); + writer.key("loadFactor").value(loadFactor); + writer.key("expectedFpp").value(expectedFpp); + } + + private static void writeRowGroupIndexes(JSONWriter writer, int col, + OrcProto.RowIndex[] rowGroupIndex) + throws JSONException { + + OrcProto.RowIndex index; + if (rowGroupIndex == null || (col >= rowGroupIndex.length) || + ((index = rowGroupIndex[col]) == null)) { + return; + } + + writer.key("rowGroupIndexes").array(); + for (int entryIx = 0; entryIx < index.getEntryCount(); ++entryIx) { + writer.object(); + writer.key("entryId").value(entryIx); + OrcProto.RowIndexEntry entry = index.getEntry(entryIx); + if (entry == null) { + continue; + } + OrcProto.ColumnStatistics colStats = entry.getStatistics(); + writeColumnStatistics(writer, ColumnStatisticsImpl.deserialize(colStats)); + writer.key("positions").array(); + for (int posIx = 0; posIx < entry.getPositionsCount(); ++posIx) { + writer.value(entry.getPositions(posIx)); + } + writer.endArray(); + writer.endObject(); + } + writer.endArray(); + } + +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/test/org/apache/orc/TestColumnStatistics.java ---------------------------------------------------------------------- diff --git a/java/core/src/test/org/apache/orc/TestColumnStatistics.java b/java/core/src/test/org/apache/orc/TestColumnStatistics.java new file mode 100644 index 0000000..1837dbb --- /dev/null +++ b/java/core/src/test/org/apache/orc/TestColumnStatistics.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc; + +import static junit.framework.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.sql.Timestamp; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.orc.impl.ColumnStatisticsImpl; +import org.apache.orc.tools.FileDump; +import org.apache.orc.tools.TestFileDump; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * Test ColumnStatisticsImpl for ORC. + */ +public class TestColumnStatistics { + + @Test + public void testLongMerge() throws Exception { + TypeDescription schema = TypeDescription.createInt(); + + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); + stats1.updateInteger(10, 2); + stats2.updateInteger(1, 1); + stats2.updateInteger(1000, 1); + stats1.merge(stats2); + IntegerColumnStatistics typed = (IntegerColumnStatistics) stats1; + assertEquals(1, typed.getMinimum()); + assertEquals(1000, typed.getMaximum()); + stats1.reset(); + stats1.updateInteger(-10, 1); + stats1.updateInteger(10000, 1); + stats1.merge(stats2); + assertEquals(-10, typed.getMinimum()); + assertEquals(10000, typed.getMaximum()); + } + + @Test + public void testDoubleMerge() throws Exception { + TypeDescription schema = TypeDescription.createDouble(); + + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); + stats1.updateDouble(10.0); + stats1.updateDouble(100.0); + stats2.updateDouble(1.0); + stats2.updateDouble(1000.0); + stats1.merge(stats2); + DoubleColumnStatistics typed = (DoubleColumnStatistics) stats1; + assertEquals(1.0, typed.getMinimum(), 0.001); + assertEquals(1000.0, typed.getMaximum(), 0.001); + stats1.reset(); + stats1.updateDouble(-10); + stats1.updateDouble(10000); + stats1.merge(stats2); + assertEquals(-10, typed.getMinimum(), 0.001); + assertEquals(10000, typed.getMaximum(), 0.001); + } + + + @Test + public void testStringMerge() throws Exception { + TypeDescription schema = TypeDescription.createString(); + + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); + stats1.updateString(new Text("bob")); + stats1.updateString(new Text("david")); + stats1.updateString(new Text("charles")); + stats2.updateString(new Text("anne")); + byte[] erin = new byte[]{0, 1, 2, 3, 4, 5, 101, 114, 105, 110}; + stats2.updateString(erin, 6, 4, 5); + assertEquals(24, ((StringColumnStatistics)stats2).getSum()); + stats1.merge(stats2); + StringColumnStatistics typed = (StringColumnStatistics) stats1; + assertEquals("anne", typed.getMinimum()); + assertEquals("erin", typed.getMaximum()); + assertEquals(39, typed.getSum()); + stats1.reset(); + stats1.updateString(new Text("aaa")); + stats1.updateString(new Text("zzz")); + stats1.merge(stats2); + assertEquals("aaa", typed.getMinimum()); + assertEquals("zzz", typed.getMaximum()); + } + + @Test + public void testDateMerge() throws Exception { + TypeDescription schema = TypeDescription.createDate(); + + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); + stats1.updateDate(new DateWritable(1000)); + stats1.updateDate(new DateWritable(100)); + stats2.updateDate(new DateWritable(10)); + stats2.updateDate(new DateWritable(2000)); + stats1.merge(stats2); + DateColumnStatistics typed = (DateColumnStatistics) stats1; + assertEquals(new DateWritable(10).get(), typed.getMinimum()); + assertEquals(new DateWritable(2000).get(), typed.getMaximum()); + stats1.reset(); + stats1.updateDate(new DateWritable(-10)); + stats1.updateDate(new DateWritable(10000)); + stats1.merge(stats2); + assertEquals(new DateWritable(-10).get(), typed.getMinimum()); + assertEquals(new DateWritable(10000).get(), typed.getMaximum()); + } + + @Test + public void testTimestampMerge() throws Exception { + TypeDescription schema = TypeDescription.createTimestamp(); + + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); + stats1.updateTimestamp(new Timestamp(10)); + stats1.updateTimestamp(new Timestamp(100)); + stats2.updateTimestamp(new Timestamp(1)); + stats2.updateTimestamp(new Timestamp(1000)); + stats1.merge(stats2); + TimestampColumnStatistics typed = (TimestampColumnStatistics) stats1; + assertEquals(1, typed.getMinimum().getTime()); + assertEquals(1000, typed.getMaximum().getTime()); + stats1.reset(); + stats1.updateTimestamp(new Timestamp(-10)); + stats1.updateTimestamp(new Timestamp(10000)); + stats1.merge(stats2); + assertEquals(-10, typed.getMinimum().getTime()); + assertEquals(10000, typed.getMaximum().getTime()); + } + + @Test + public void testDecimalMerge() throws Exception { + TypeDescription schema = TypeDescription.createDecimal() + .withPrecision(38).withScale(16); + + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); + stats1.updateDecimal(HiveDecimal.create(10)); + stats1.updateDecimal(HiveDecimal.create(100)); + stats2.updateDecimal(HiveDecimal.create(1)); + stats2.updateDecimal(HiveDecimal.create(1000)); + stats1.merge(stats2); + DecimalColumnStatistics typed = (DecimalColumnStatistics) stats1; + assertEquals(1, typed.getMinimum().longValue()); + assertEquals(1000, typed.getMaximum().longValue()); + stats1.reset(); + stats1.updateDecimal(HiveDecimal.create(-10)); + stats1.updateDecimal(HiveDecimal.create(10000)); + stats1.merge(stats2); + assertEquals(-10, typed.getMinimum().longValue()); + assertEquals(10000, typed.getMaximum().longValue()); + } + + + Path workDir = new Path(System.getProperty("test.tmp.dir", + "target" + File.separator + "test" + File.separator + "tmp")); + + Configuration conf; + FileSystem fs; + Path testFilePath; + + @Rule + public TestName testCaseName = new TestName(); + + @Before + public void openFileSystem() throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + fs.setWorkingDirectory(workDir); + testFilePath = new Path("TestOrcFile." + testCaseName.getMethodName() + ".orc"); + fs.delete(testFilePath, false); + } + + private static BytesWritable bytes(int... items) { + BytesWritable result = new BytesWritable(); + result.setSize(items.length); + for (int i = 0; i < items.length; ++i) { + result.getBytes()[i] = (byte) items[i]; + } + return result; + } + + void appendRow(VectorizedRowBatch batch, BytesWritable bytes, + String str) { + int row = batch.size++; + if (bytes == null) { + batch.cols[0].noNulls = false; + batch.cols[0].isNull[row] = true; + } else { + ((BytesColumnVector) batch.cols[0]).setVal(row, bytes.getBytes(), + 0, bytes.getLength()); + } + if (str == null) { + batch.cols[1].noNulls = false; + batch.cols[1].isNull[row] = true; + } else { + ((BytesColumnVector) batch.cols[1]).setVal(row, str.getBytes()); + } + } + + @Test + public void testHasNull() throws Exception { + TypeDescription schema = + TypeDescription.createStruct() + .addField("bytes1", TypeDescription.createBinary()) + .addField("string1", TypeDescription.createString()); + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf) + .setSchema(schema) + .rowIndexStride(1000) + .stripeSize(10000) + .bufferSize(10000)); + VectorizedRowBatch batch = schema.createRowBatch(5000); + // STRIPE 1 + // RG1 + for(int i=0; i<1000; i++) { + appendRow(batch, bytes(1, 2, 3), "RG1"); + } + writer.addRowBatch(batch); + batch.reset(); + // RG2 + for(int i=0; i<1000; i++) { + appendRow(batch, bytes(1, 2, 3), null); + } + writer.addRowBatch(batch); + batch.reset(); + // RG3 + for(int i=0; i<1000; i++) { + appendRow(batch, bytes(1, 2, 3), "RG3"); + } + writer.addRowBatch(batch); + batch.reset(); + // RG4 + for (int i = 0; i < 1000; i++) { + appendRow(batch, bytes(1,2,3), null); + } + writer.addRowBatch(batch); + batch.reset(); + // RG5 + for(int i=0; i<1000; i++) { + appendRow(batch, bytes(1, 2, 3), null); + } + writer.addRowBatch(batch); + batch.reset(); + // STRIPE 2 + for (int i = 0; i < 5000; i++) { + appendRow(batch, bytes(1,2,3), null); + } + writer.addRowBatch(batch); + batch.reset(); + // STRIPE 3 + for (int i = 0; i < 5000; i++) { + appendRow(batch, bytes(1,2,3), "STRIPE-3"); + } + writer.addRowBatch(batch); + batch.reset(); + // STRIPE 4 + for (int i = 0; i < 5000; i++) { + appendRow(batch, bytes(1,2,3), null); + } + writer.addRowBatch(batch); + batch.reset(); + writer.close(); + Reader reader = OrcFile.createReader(testFilePath, + OrcFile.readerOptions(conf).filesystem(fs)); + + // check the file level stats + ColumnStatistics[] stats = reader.getStatistics(); + assertEquals(20000, stats[0].getNumberOfValues()); + assertEquals(20000, stats[1].getNumberOfValues()); + assertEquals(7000, stats[2].getNumberOfValues()); + assertEquals(false, stats[0].hasNull()); + assertEquals(false, stats[1].hasNull()); + assertEquals(true, stats[2].hasNull()); + + // check the stripe level stats + List<StripeStatistics> stripeStats = reader.getStripeStatistics(); + // stripe 1 stats + StripeStatistics ss1 = stripeStats.get(0); + ColumnStatistics ss1_cs1 = ss1.getColumnStatistics()[0]; + ColumnStatistics ss1_cs2 = ss1.getColumnStatistics()[1]; + ColumnStatistics ss1_cs3 = ss1.getColumnStatistics()[2]; + assertEquals(false, ss1_cs1.hasNull()); + assertEquals(false, ss1_cs2.hasNull()); + assertEquals(true, ss1_cs3.hasNull()); + + // stripe 2 stats + StripeStatistics ss2 = stripeStats.get(1); + ColumnStatistics ss2_cs1 = ss2.getColumnStatistics()[0]; + ColumnStatistics ss2_cs2 = ss2.getColumnStatistics()[1]; + ColumnStatistics ss2_cs3 = ss2.getColumnStatistics()[2]; + assertEquals(false, ss2_cs1.hasNull()); + assertEquals(false, ss2_cs2.hasNull()); + assertEquals(true, ss2_cs3.hasNull()); + + // stripe 3 stats + StripeStatistics ss3 = stripeStats.get(2); + ColumnStatistics ss3_cs1 = ss3.getColumnStatistics()[0]; + ColumnStatistics ss3_cs2 = ss3.getColumnStatistics()[1]; + ColumnStatistics ss3_cs3 = ss3.getColumnStatistics()[2]; + assertEquals(false, ss3_cs1.hasNull()); + assertEquals(false, ss3_cs2.hasNull()); + assertEquals(false, ss3_cs3.hasNull()); + + // stripe 4 stats + StripeStatistics ss4 = stripeStats.get(3); + ColumnStatistics ss4_cs1 = ss4.getColumnStatistics()[0]; + ColumnStatistics ss4_cs2 = ss4.getColumnStatistics()[1]; + ColumnStatistics ss4_cs3 = ss4.getColumnStatistics()[2]; + assertEquals(false, ss4_cs1.hasNull()); + assertEquals(false, ss4_cs2.hasNull()); + assertEquals(true, ss4_cs3.hasNull()); + + // Test file dump + PrintStream origOut = System.out; + String outputFilename = "orc-file-has-null.out"; + FileOutputStream myOut = new FileOutputStream(workDir + File.separator + outputFilename); + + // replace stdout and run command + System.setOut(new PrintStream(myOut)); + FileDump.main(new String[]{testFilePath.toString(), "--rowindex=2"}); + System.out.flush(); + System.setOut(origOut); + // If called with an expression evaluating to false, the test will halt + // and be ignored. + assumeTrue(!System.getProperty("os.name").startsWith("Windows")); + TestFileDump.checkOutput(outputFilename, workDir + File.separator + outputFilename); + } +}
