http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java new file mode 100644 index 0000000..a347706 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +/** + * Dynamic int array that uses primitive types and chunks to avoid copying + * large number of integers when it resizes. + * + * The motivation for this class is memory optimization, i.e. space efficient + * storage of potentially huge arrays without good a-priori size guesses. + * + * The API of this class is between a primitive array and a AbstractList. It's + * not a Collection implementation because it handles primitive types, but the + * API could be extended to support iterators and the like. + * + * NOTE: Like standard Collection implementations/arrays, this class is not + * synchronized. + */ +final class DynamicIntArray { + static final int DEFAULT_CHUNKSIZE = 8 * 1024; + static final int INIT_CHUNKS = 128; + + private final int chunkSize; // our allocation size + private int[][] data; // the real data + private int length; // max set element index +1 + private int initializedChunks = 0; // the number of created chunks + + public DynamicIntArray() { + this(DEFAULT_CHUNKSIZE); + } + + public DynamicIntArray(int chunkSize) { + this.chunkSize = chunkSize; + + data = new int[INIT_CHUNKS][]; + } + + /** + * Ensure that the given index is valid. + */ + private void grow(int chunkIndex) { + if (chunkIndex >= initializedChunks) { + if (chunkIndex >= data.length) { + int newSize = Math.max(chunkIndex + 1, 2 * data.length); + int[][] newChunk = new int[newSize][]; + System.arraycopy(data, 0, newChunk, 0, data.length); + data = newChunk; + } + for (int i=initializedChunks; i <= chunkIndex; ++i) { + data[i] = new int[chunkSize]; + } + initializedChunks = chunkIndex + 1; + } + } + + public int get(int index) { + if (index >= length) { + throw new IndexOutOfBoundsException("Index " + index + + " is outside of 0.." + + (length - 1)); + } + int i = index / chunkSize; + int j = index % chunkSize; + return data[i][j]; + } + + public void set(int index, int value) { + int i = index / chunkSize; + int j = index % chunkSize; + grow(i); + if (index >= length) { + length = index + 1; + } + data[i][j] = value; + } + + public void increment(int index, int value) { + int i = index / chunkSize; + int j = index % chunkSize; + grow(i); + if (index >= length) { + length = index + 1; + } + data[i][j] += value; + } + + public void add(int value) { + int i = length / chunkSize; + int j = length % chunkSize; + grow(i); + data[i][j] = value; + length += 1; + } + + public int size() { + return length; + } + + public void clear() { + length = 0; + for(int i=0; i < data.length; ++i) { + data[i] = null; + } + initializedChunks = 0; + } + + public String toString() { + int i; + StringBuilder sb = new StringBuilder(length * 4); + + sb.append('{'); + int l = length - 1; + for (i=0; i<l; i++) { + sb.append(get(i)); + sb.append(','); + } + sb.append(get(i)); + sb.append('}'); + + return sb.toString(); + } + + public int getSizeInBytes() { + return 4 * initializedChunks * chunkSize; + } +} +
http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java new file mode 100644 index 0000000..208454f --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +/** + * Statistics for all of the integer columns, such as byte, short, int, and + * long. + */ +public interface IntegerColumnStatistics extends ColumnStatistics { + /** + * Get the smallest value in the column. Only defined if getNumberOfValues + * is non-zero. + * @return the minimum + */ + long getMinimum(); + + /** + * Get the largest value in the column. Only defined if getNumberOfValues + * is non-zero. + * @return the maximum + */ + long getMaximum(); + + /** + * Is the sum defined? If the sum overflowed the counter this will be false. + * @return is the sum available + */ + boolean isSumDefined(); + + /** + * Get the sum of the column. Only valid if isSumDefined returns true. + * @return the sum of the column + */ + long getSum(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java new file mode 100644 index 0000000..6872882 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import java.io.IOException; + +/** + * Interface for writing integers. + */ +interface IntegerWriter { + + /** + * Get position from the stream. + * @param recorder + * @throws IOException + */ + void getPosition(PositionRecorder recorder) throws IOException; + + /** + * Write the integer value + * @param value + * @throws IOException + */ + void write(long value) throws IOException; + + /** + * Flush the buffer + * @throws IOException + */ + void flush() throws IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java new file mode 100644 index 0000000..8cd40f7 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Implements a memory manager that keeps a global context of how many ORC + * writers there are and manages the memory between them. For use cases with + * dynamic partitions, it is easy to end up with many writers in the same task. + * By managing the size of each allocation, we try to cut down the size of each + * allocation and keep the task from running out of memory. + * + * This class is not thread safe, but is re-entrant - ensure creation and all + * invocations are triggered from the same thread. + */ +class MemoryManager { + + private static final Log LOG = LogFactory.getLog(MemoryManager.class); + + /** + * How often should we check the memory sizes? Measured in rows added + * to all of the writers. + */ + private static final int ROWS_BETWEEN_CHECKS = 5000; + private final long totalMemoryPool; + private final Map<Path, WriterInfo> writerList = + new HashMap<Path, WriterInfo>(); + private long totalAllocation = 0; + private double currentScale = 1; + private int rowsAddedSinceCheck = 0; + private final OwnedLock ownerLock = new OwnedLock(); + + @SuppressWarnings("serial") + private static class OwnedLock extends ReentrantLock { + public Thread getOwner() { + return super.getOwner(); + } + } + + private static class WriterInfo { + long allocation; + Callback callback; + WriterInfo(long allocation, Callback callback) { + this.allocation = allocation; + this.callback = callback; + } + } + + public interface Callback { + /** + * The writer needs to check its memory usage + * @param newScale the current scale factor for memory allocations + * @return true if the writer was over the limit + * @throws IOException + */ + boolean checkMemory(double newScale) throws IOException; + } + + /** + * Create the memory manager. + * @param conf use the configuration to find the maximum size of the memory + * pool. + */ + MemoryManager(Configuration conf) { + HiveConf.ConfVars poolVar = HiveConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL; + double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal); + totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean(). + getHeapMemoryUsage().getMax() * maxLoad); + ownerLock.lock(); + } + + /** + * Light weight thread-safety check for multi-threaded access patterns + */ + private void checkOwner() { + Preconditions.checkArgument(ownerLock.isHeldByCurrentThread(), + "Owner thread expected %s, got %s", + ownerLock.getOwner(), + Thread.currentThread()); + } + + /** + * Add a new writer's memory allocation to the pool. We use the path + * as a unique key to ensure that we don't get duplicates. + * @param path the file that is being written + * @param requestedAllocation the requested buffer size + */ + void addWriter(Path path, long requestedAllocation, + Callback callback) throws IOException { + checkOwner(); + WriterInfo oldVal = writerList.get(path); + // this should always be null, but we handle the case where the memory + // manager wasn't told that a writer wasn't still in use and the task + // starts writing to the same path. + if (oldVal == null) { + oldVal = new WriterInfo(requestedAllocation, callback); + writerList.put(path, oldVal); + totalAllocation += requestedAllocation; + } else { + // handle a new writer that is writing to the same path + totalAllocation += requestedAllocation - oldVal.allocation; + oldVal.allocation = requestedAllocation; + oldVal.callback = callback; + } + updateScale(true); + } + + /** + * Remove the given writer from the pool. + * @param path the file that has been closed + */ + void removeWriter(Path path) throws IOException { + checkOwner(); + WriterInfo val = writerList.get(path); + if (val != null) { + writerList.remove(path); + totalAllocation -= val.allocation; + if (writerList.isEmpty()) { + rowsAddedSinceCheck = 0; + } + updateScale(false); + } + if(writerList.isEmpty()) { + rowsAddedSinceCheck = 0; + } + } + + /** + * Get the total pool size that is available for ORC writers. + * @return the number of bytes in the pool + */ + long getTotalMemoryPool() { + return totalMemoryPool; + } + + /** + * The scaling factor for each allocation to ensure that the pool isn't + * oversubscribed. + * @return a fraction between 0.0 and 1.0 of the requested size that is + * available for each writer. + */ + double getAllocationScale() { + return currentScale; + } + + /** + * Give the memory manager an opportunity for doing a memory check. + * @throws IOException + */ + void addedRow() throws IOException { + if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) { + notifyWriters(); + } + } + + /** + * Notify all of the writers that they should check their memory usage. + * @throws IOException + */ + void notifyWriters() throws IOException { + checkOwner(); + LOG.debug("Notifying writers after " + rowsAddedSinceCheck); + for(WriterInfo writer: writerList.values()) { + boolean flushed = writer.callback.checkMemory(currentScale); + if (LOG.isDebugEnabled() && flushed) { + LOG.debug("flushed " + writer.toString()); + } + } + rowsAddedSinceCheck = 0; + } + + /** + * Update the currentScale based on the current allocation and pool size. + * This also updates the notificationTrigger. + * @param isAllocate is this an allocation? + */ + private void updateScale(boolean isAllocate) throws IOException { + if (totalAllocation <= totalMemoryPool) { + currentScale = 1; + } else { + currentScale = (double) totalMemoryPool / totalAllocation; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java new file mode 100644 index 0000000..dfa4c36 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import com.google.common.collect.Lists; + +import java.util.List; + +public class Metadata { + + private final OrcProto.Metadata metadata; + + Metadata(OrcProto.Metadata m) { + this.metadata = m; + } + + /** + * Return list of stripe level column statistics + * + * @return list of stripe statistics + */ + public List<StripeStatistics> getStripeStatistics() { + List<StripeStatistics> result = Lists.newArrayList(); + for (OrcProto.StripeStatistics ss : metadata.getStripeStatsList()) { + result.add(new StripeStatistics(ss.getColStatsList())); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcConf.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcConf.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcConf.java new file mode 100644 index 0000000..b704666 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcConf.java @@ -0,0 +1,149 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import org.apache.hadoop.conf.Configuration; + +// All configs in this class also appear in HiveConf, so any changes here should also be made there +// This is because only HiveConf can provide type checking through the CLI, and Presto depends on +// open source Hive, and so won't work with any variables not in open source HiveConf +public class OrcConf { + + public enum ConfVars { + HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f), + + // Maximum fraction of heap that can be used by ORC file writers + HIVE_ORC_FILE_MEMORY_POOL("hive.exec.orc.memory.pool", 0.5f), // 50% + HIVE_ORC_FILE_MIN_MEMORY_ALLOCATION("hive.exec.orc.min.mem.allocation", 4194304L), // 4 Mb + HIVE_ORC_FILE_ENABLE_LOW_MEMORY_MODE("hive.exec.orc.low.memory", false), + HIVE_ORC_ROW_BUFFER_SIZE("hive.exec.orc.row.buffer.size", 100), + + HIVE_ORC_EAGER_HDFS_READ("hive.exec.orc.eager.hdfs.read", true), + HIVE_ORC_EAGER_HDFS_READ_BYTES("hive.exec.orc.eager.hdfs.read.bytes", 193986560), // 185 Mb + + HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK("hive.orc.row.index.stride.dictionary.check", true), + HIVE_ORC_DEFAULT_STRIPE_SIZE("hive.exec.orc.default.stripe.size", 64L * 1024 * 1024), + HIVE_ORC_DEFAULT_BLOCK_SIZE("hive.exec.orc.default.block.size", 256L * 1024 * 1024), + HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE("hive.exec.orc.default.row.index.stride", 10000), + HIVE_ORC_DEFAULT_BUFFER_SIZE("hive.exec.orc.default.buffer.size", 256 * 1024), + HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding", true), + HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB"), + HIVE_ORC_WRITE_FORMAT("hive.exec.orc.write.format", null), // 0.11 or 0.12 + HIVE_ORC_ENCODING_STRATEGY("hive.exec.orc.encoding.strategy", "SPEED"), + HIVE_ORC_COMPRESSION_STRATEGY("hive.exec.orc.compression.strategy", "SPEED"), + HIVE_ORC_BLOCK_PADDING_TOLERANCE("hive.exec.orc.block.padding.tolerance", 0.05f), + + ; + + public final String varname; + public final String defaultVal; + public final int defaultIntVal; + public final long defaultLongVal; + public final float defaultFloatVal; + public final boolean defaultBoolVal; + + + ConfVars(String varname, String defaultVal) { + this.varname = varname; + this.defaultVal = defaultVal; + this.defaultIntVal = -1; + this.defaultLongVal = -1; + this.defaultFloatVal = -1; + this.defaultBoolVal = false; + } + + ConfVars(String varname, int defaultIntVal) { + this.varname = varname; + this.defaultVal = Integer.toString(defaultIntVal); + this.defaultIntVal = defaultIntVal; + this.defaultLongVal = -1; + this.defaultFloatVal = -1; + this.defaultBoolVal = false; + } + + ConfVars(String varname, long defaultLongVal) { + this.varname = varname; + this.defaultVal = Long.toString(defaultLongVal); + this.defaultIntVal = -1; + this.defaultLongVal = defaultLongVal; + this.defaultFloatVal = -1; + this.defaultBoolVal = false; + } + + ConfVars(String varname, float defaultFloatVal) { + this.varname = varname; + this.defaultVal = Float.toString(defaultFloatVal); + this.defaultIntVal = -1; + this.defaultLongVal = -1; + this.defaultFloatVal = defaultFloatVal; + this.defaultBoolVal = false; + } + + ConfVars(String varname, boolean defaultBoolVal) { + this.varname = varname; + this.defaultVal = Boolean.toString(defaultBoolVal); + this.defaultIntVal = -1; + this.defaultLongVal = -1; + this.defaultFloatVal = -1; + this.defaultBoolVal = defaultBoolVal; + } + } + + public static int getIntVar(Configuration conf, ConfVars var) { + return conf.getInt(var.varname, var.defaultIntVal); + } + + public static void setIntVar(Configuration conf, ConfVars var, int val) { + conf.setInt(var.varname, val); + } + + public static long getLongVar(Configuration conf, ConfVars var) { + return conf.getLong(var.varname, var.defaultLongVal); + } + + public static void setLongVar(Configuration conf, ConfVars var, long val) { + conf.setLong(var.varname, val); + } + + public static float getFloatVar(Configuration conf, ConfVars var) { + return conf.getFloat(var.varname, var.defaultFloatVal); + } + + public static void setFloatVar(Configuration conf, ConfVars var, float val) { + conf.setFloat(var.varname, val); + } + + public static boolean getBoolVar(Configuration conf, ConfVars var) { + return conf.getBoolean(var.varname, var.defaultBoolVal); + } + + public static void setBoolVar(Configuration conf, ConfVars var, boolean val) { + conf.setBoolean(var.varname, val); + } + + public static String getVar(Configuration conf, ConfVars var) { + return conf.get(var.varname, var.defaultVal); + } + + public static void setVar(Configuration conf, ConfVars var, String val) { + conf.set(var.varname, val); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java new file mode 100644 index 0000000..a291953 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +import static org.apache.tajo.storage.thirdparty.orc.OrcConf.ConfVars.*; + +import java.io.IOException; +import java.util.TimeZone; + +/** + * Contains factory methods to read or write ORC files. + */ +public final class OrcFile { + + public static final String MAGIC = "ORC"; + + /** + * Create a version number for the ORC file format, so that we can add + * non-forward compatible changes in the future. To make it easier for users + * to understand the version numbers, we use the Hive release number that + * first wrote that version of ORC files. + * + * Thus, if you add new encodings or other non-forward compatible changes + * to ORC files, which prevent the old reader from reading the new format, + * you should change these variable to reflect the next Hive release number. + * Non-forward compatible changes should never be added in patch releases. + * + * Do not make any changes that break backwards compatibility, which would + * prevent the new reader from reading ORC files generated by any released + * version of Hive. + */ + public static enum Version { + V_0_11("0.11", 0, 11), + V_0_12("0.12", 0, 12); + + public static final Version CURRENT = V_0_12; + + private final String name; + private final int major; + private final int minor; + + private Version(String name, int major, int minor) { + this.name = name; + this.major = major; + this.minor = minor; + } + + public static Version byName(String name) { + for(Version version: values()) { + if (version.name.equals(name)) { + return version; + } + } + throw new IllegalArgumentException("Unknown ORC version " + name); + } + + /** + * Get the human readable name for the version. + */ + public String getName() { + return name; + } + + /** + * Get the major version number. + */ + public int getMajor() { + return major; + } + + /** + * Get the minor version number. + */ + public int getMinor() { + return minor; + } + } + + /** + * Records the version of the writer in terms of which bugs have been fixed. + * For bugs in the writer, but the old readers already read the new data + * correctly, bump this version instead of the Version. + */ + public static enum WriterVersion { + ORIGINAL(0), + HIVE_8732(1); // corrupted stripe/file maximum column statistics + + private final int id; + + public int getId() { + return id; + } + + private WriterVersion(int id) { + this.id = id; + } + } + + public static enum EncodingStrategy { + SPEED, COMPRESSION; + } + + public static enum CompressionStrategy { + SPEED, COMPRESSION; + } + + // Note : these string definitions for table properties are deprecated, + // and retained only for backward compatibility, please do not add to + // them, add to OrcTableProperties below instead + @Deprecated public static final String COMPRESSION = "orc.compress"; + @Deprecated public static final String COMPRESSION_BLOCK_SIZE = "orc.compress.size"; + @Deprecated public static final String STRIPE_SIZE = "orc.stripe.size"; + @Deprecated public static final String ROW_INDEX_STRIDE = "orc.row.index.stride"; + @Deprecated public static final String ENABLE_INDEXES = "orc.create.index"; + @Deprecated public static final String BLOCK_PADDING = "orc.block.padding"; + + /** + * Enum container for all orc table properties. + * If introducing a new orc-specific table property, + * add it here. + */ + public static enum OrcTableProperties { + COMPRESSION("orc.compress"), + COMPRESSION_BLOCK_SIZE("orc.compress.size"), + STRIPE_SIZE("orc.stripe.size"), + BLOCK_SIZE("orc.block.size"), + ROW_INDEX_STRIDE("orc.row.index.stride"), + ENABLE_INDEXES("orc.create.index"), + BLOCK_PADDING("orc.block.padding"), + ENCODING_STRATEGY("orc.encoding.strategy"), + BLOOM_FILTER_COLUMNS("orc.bloom.filter.columns"), + BLOOM_FILTER_FPP("orc.bloom.filter.fpp"); + + private final String propName; + + OrcTableProperties(String propName) { + this.propName = propName; + } + + public String getPropName(){ + return this.propName; + } + } + + // unused + private OrcFile() {} + + public static interface WriterContext { + Writer getWriter(); + } + + public static interface WriterCallback { + public void preStripeWrite(WriterContext context) throws IOException; + public void preFooterWrite(WriterContext context) throws IOException; + } + + /** + * Options for creating ORC file writers. + */ + public static class WriterOptions { + private final Configuration configuration; + private FileSystem fileSystemValue = null; + private ObjectInspector inspectorValue = null; + private long stripeSizeValue; + private long blockSizeValue; + private int rowIndexStrideValue; + private int bufferSizeValue; + private boolean blockPaddingValue; + private CompressionKind compressValue; + private MemoryManager memoryManagerValue; + private Version versionValue; + private WriterCallback callback; + private EncodingStrategy encodingStrategy; + private CompressionStrategy compressionStrategy; + private float paddingTolerance; + private String bloomFilterColumns; + private double bloomFilterFpp; + private TimeZone timezone; + + WriterOptions(Configuration conf) { + configuration = conf; + memoryManagerValue = getMemoryManager(conf); + stripeSizeValue = OrcConf.getLongVar(conf, HIVE_ORC_DEFAULT_STRIPE_SIZE); + blockSizeValue = OrcConf.getLongVar(conf, HIVE_ORC_DEFAULT_BLOCK_SIZE); + rowIndexStrideValue = OrcConf.getIntVar(conf, HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE); + bufferSizeValue = OrcConf.getIntVar(conf, HIVE_ORC_DEFAULT_BUFFER_SIZE); + blockPaddingValue = OrcConf.getBoolVar(conf, HIVE_ORC_DEFAULT_BLOCK_PADDING); + compressValue = CompressionKind.valueOf(OrcConf.getVar(conf, HIVE_ORC_DEFAULT_COMPRESS)); + String versionName = OrcConf.getVar(conf, HIVE_ORC_WRITE_FORMAT); + if (versionName == null) { + versionValue = Version.CURRENT; + } else { + versionValue = Version.byName(versionName); + } + String enString = + conf.get(OrcConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname); + if (enString == null) { + encodingStrategy = EncodingStrategy.SPEED; + } else { + encodingStrategy = EncodingStrategy.valueOf(enString); + } + + String compString = conf + .get(OrcConf.ConfVars.HIVE_ORC_COMPRESSION_STRATEGY.varname); + if (compString == null) { + compressionStrategy = CompressionStrategy.SPEED; + } else { + compressionStrategy = CompressionStrategy.valueOf(compString); + } + + paddingTolerance = conf.getFloat(OrcConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname, + OrcConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal); + bloomFilterFpp = BloomFilterIO.DEFAULT_FPP; + } + + /** + * Provide the filesystem for the path, if the client has it available. + * If it is not provided, it will be found from the path. + */ + public WriterOptions fileSystem(FileSystem value) { + fileSystemValue = value; + return this; + } + + /** + * Set the stripe size for the file. The writer stores the contents of the + * stripe in memory until this memory limit is reached and the stripe + * is flushed to the HDFS file and the next stripe started. + */ + public WriterOptions stripeSize(long value) { + stripeSizeValue = value; + return this; + } + + /** + * Set the file system block size for the file. For optimal performance, + * set the block size to be multiple factors of stripe size. + */ + public WriterOptions blockSize(long value) { + blockSizeValue = value; + return this; + } + + /** + * Set the distance between entries in the row index. The minimum value is + * 1000 to prevent the index from overwhelming the data. If the stride is + * set to 0, no indexes will be included in the file. + */ + public WriterOptions rowIndexStride(int value) { + rowIndexStrideValue = value; + return this; + } + + /** + * The size of the memory buffers used for compressing and storing the + * stripe in memory. + */ + public WriterOptions bufferSize(int value) { + bufferSizeValue = value; + return this; + } + + /** + * Sets whether the HDFS blocks are padded to prevent stripes from + * straddling blocks. Padding improves locality and thus the speed of + * reading, but costs space. + */ + public WriterOptions blockPadding(boolean value) { + blockPaddingValue = value; + return this; + } + + /** + * Sets the encoding strategy that is used to encode the data. + */ + public WriterOptions encodingStrategy(EncodingStrategy strategy) { + encodingStrategy = strategy; + return this; + } + + /** + * Sets the tolerance for block padding as a percentage of stripe size. + */ + public WriterOptions paddingTolerance(float value) { + paddingTolerance = value; + return this; + } + + /** + * Comma separated values of column names for which bloom filter is to be created. + */ + public WriterOptions bloomFilterColumns(String columns) { + bloomFilterColumns = columns; + return this; + } + + /** + * Specify the false positive probability for bloom filter. + * @param fpp - false positive probability + * @return + */ + public WriterOptions bloomFilterFpp(double fpp) { + bloomFilterFpp = fpp; + return this; + } + + /** + * Sets the generic compression that is used to compress the data. + */ + public WriterOptions compress(CompressionKind value) { + compressValue = value; + return this; + } + + /** + * A required option that sets the object inspector for the rows. Used + * to determine the schema for the file. + */ + public WriterOptions inspector(ObjectInspector value) { + inspectorValue = value; + return this; + } + + /** + * Sets the version of the file that will be written. + */ + public WriterOptions version(Version value) { + versionValue = value; + return this; + } + + /** + * Add a listener for when the stripe and file are about to be closed. + * @param callback the object to be called when the stripe is closed + * @return + */ + public WriterOptions callback(WriterCallback callback) { + this.callback = callback; + return this; + } + + /** + * A package local option to set the memory manager. + */ + WriterOptions memory(MemoryManager value) { + memoryManagerValue = value; + return this; + } + + /** + * Tajo-specific + */ + WriterOptions timezone(TimeZone value) { + timezone = value; + return this; + } + } + + /** + * Create a default set of write options that can be modified. + */ + public static WriterOptions writerOptions(Configuration conf) { + return new WriterOptions(conf); + } + + /** + * Create an ORC file writer. This is the public interface for creating + * writers going forward and new options will only be added to this method. + * @param path filename to write to + * @param opts the options + * @return a new ORC file writer + * @throws IOException + */ + public static Writer createWriter(Path path, + WriterOptions opts + ) throws IOException { + FileSystem fs = opts.fileSystemValue == null ? + path.getFileSystem(opts.configuration) : opts.fileSystemValue; + + return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue, + opts.stripeSizeValue, opts.compressValue, + opts.bufferSizeValue, opts.rowIndexStrideValue, + opts.memoryManagerValue, opts.blockPaddingValue, + opts.versionValue, opts.callback, + opts.encodingStrategy, opts.compressionStrategy, + opts.paddingTolerance, opts.blockSizeValue, + opts.bloomFilterColumns, opts.bloomFilterFpp, + opts.timezone); + } + + /** + * Create an ORC file writer. This method is provided for API backward + * compatability with Hive 0.11. + * @param fs file system + * @param path filename to write to + * @param inspector the ObjectInspector that inspects the rows + * @param stripeSize the number of bytes in a stripe + * @param compress how to compress the file + * @param bufferSize the number of bytes to compress at once + * @param rowIndexStride the number of rows between row index entries or + * 0 to suppress all indexes + * @return a new ORC file writer + * @throws IOException + */ + public static Writer createWriter(FileSystem fs, + Path path, + Configuration conf, + ObjectInspector inspector, + long stripeSize, + CompressionKind compress, + int bufferSize, + int rowIndexStride, + TimeZone timeZone) throws IOException { + return createWriter(path, + writerOptions(conf) + .fileSystem(fs) + .inspector(inspector) + .stripeSize(stripeSize) + .compress(compress) + .bufferSize(bufferSize) + .rowIndexStride(rowIndexStride) + .timezone(timeZone)); + } + + private static ThreadLocal<MemoryManager> memoryManager = null; + + private static synchronized MemoryManager getMemoryManager(final Configuration conf) { + if (memoryManager == null) { + memoryManager = new ThreadLocal<MemoryManager>() { + @Override + protected MemoryManager initialValue() { + return new MemoryManager(conf); + } + }; + } + return memoryManager.get(); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java new file mode 100644 index 0000000..847c10c --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcUtils.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.serde2.objectinspector.*; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class OrcUtils { + private static final Log LOG = LogFactory.getLog(OrcUtils.class); + + /** + * Returns selected columns as a boolean array with true value set for specified column names. + * The result will contain number of elements equal to flattened number of columns. + * For example: + * selectedColumns - a,b,c + * allColumns - a,b,c,d + * If column c is a complex type, say list<string> and other types are primitives then result will + * be [false, true, true, true, true, true, false] + * Index 0 is the root element of the struct which is set to false by default, index 1,2 + * corresponds to columns a and b. Index 3,4 correspond to column c which is list<string> and + * index 5 correspond to column d. After flattening list<string> gets 2 columns. + * + * @param selectedColumns - comma separated list of selected column names + * @param allColumns - comma separated list of all column names + * @param inspector - object inspector + * @return - boolean array with true value set for the specified column names + */ + public static boolean[] includeColumns(String selectedColumns, String allColumns, + ObjectInspector inspector) { + int numFlattenedCols = getFlattenedColumnsCount(inspector); + boolean[] results = new boolean[numFlattenedCols]; + if ("*".equals(selectedColumns)) { + Arrays.fill(results, true); + return results; + } + if (selectedColumns != null && !selectedColumns.isEmpty()) { + includeColumnsImpl(results, selectedColumns.toLowerCase(), allColumns, inspector); + } + return results; + } + + private static void includeColumnsImpl(boolean[] includeColumns, String selectedColumns, + String allColumns, + ObjectInspector inspector) { + Map<String, List<Integer>> columnSpanMap = getColumnSpan(allColumns, inspector); + LOG.info("columnSpanMap: " + columnSpanMap); + + String[] selCols = selectedColumns.split(","); + for (String sc : selCols) { + if (columnSpanMap.containsKey(sc)) { + List<Integer> colSpan = columnSpanMap.get(sc); + int start = colSpan.get(0); + int end = colSpan.get(1); + for (int i = start; i <= end; i++) { + includeColumns[i] = true; + } + } + } + + LOG.info("includeColumns: " + Arrays.toString(includeColumns)); + } + + private static Map<String, List<Integer>> getColumnSpan(String allColumns, + ObjectInspector inspector) { + // map that contains the column span for each column. Column span is the number of columns + // required after flattening. For a given object inspector this map contains the start column + // id and end column id (both inclusive) after flattening. + // EXAMPLE: + // schema: struct<a:int, b:float, c:map<string,int>> + // column span map for the above struct will be + // a => [1,1], b => [2,2], c => [3,5] + Map<String, List<Integer>> columnSpanMap = new HashMap<String, List<Integer>>(); + if (allColumns != null) { + String[] columns = allColumns.split(","); + int startIdx = 0; + int endIdx = 0; + if (inspector instanceof StructObjectInspector) { + StructObjectInspector soi = (StructObjectInspector) inspector; + List<? extends StructField> fields = soi.getAllStructFieldRefs(); + for (int i = 0; i < fields.size(); i++) { + StructField sf = fields.get(i); + + // we get the type (category) from object inspector but column name from the argument. + // The reason for this is hive (FileSinkOperator) does not pass the actual column names, + // instead it passes the internal column names (_col1,_col2). + ObjectInspector sfOI = sf.getFieldObjectInspector(); + String colName = columns[i]; + + startIdx = endIdx + 1; + switch (sfOI.getCategory()) { + case PRIMITIVE: + endIdx += 1; + break; + case STRUCT: + endIdx += 1; + StructObjectInspector structInsp = (StructObjectInspector) sfOI; + List<? extends StructField> structFields = structInsp.getAllStructFieldRefs(); + for (int j = 0; j < structFields.size(); ++j) { + endIdx += getFlattenedColumnsCount(structFields.get(j).getFieldObjectInspector()); + } + break; + case MAP: + endIdx += 1; + MapObjectInspector mapInsp = (MapObjectInspector) sfOI; + endIdx += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector()); + endIdx += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector()); + break; + case LIST: + endIdx += 1; + ListObjectInspector listInsp = (ListObjectInspector) sfOI; + endIdx += getFlattenedColumnsCount(listInsp.getListElementObjectInspector()); + break; + case UNION: + endIdx += 1; + UnionObjectInspector unionInsp = (UnionObjectInspector) sfOI; + List<ObjectInspector> choices = unionInsp.getObjectInspectors(); + for (int j = 0; j < choices.size(); ++j) { + endIdx += getFlattenedColumnsCount(choices.get(j)); + } + break; + default: + throw new IllegalArgumentException("Bad category: " + + inspector.getCategory()); + } + + columnSpanMap.put(colName, Lists.newArrayList(startIdx, endIdx)); + } + } + } + return columnSpanMap; + } + + /** + * Returns the number of columns after flatting complex types. + * + * @param inspector - object inspector + * @return + */ + public static int getFlattenedColumnsCount(ObjectInspector inspector) { + int numWriters = 0; + switch (inspector.getCategory()) { + case PRIMITIVE: + numWriters += 1; + break; + case STRUCT: + numWriters += 1; + StructObjectInspector structInsp = (StructObjectInspector) inspector; + List<? extends StructField> fields = structInsp.getAllStructFieldRefs(); + for (int i = 0; i < fields.size(); ++i) { + numWriters += getFlattenedColumnsCount(fields.get(i).getFieldObjectInspector()); + } + break; + case MAP: + numWriters += 1; + MapObjectInspector mapInsp = (MapObjectInspector) inspector; + numWriters += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector()); + numWriters += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector()); + break; + case LIST: + numWriters += 1; + ListObjectInspector listInsp = (ListObjectInspector) inspector; + numWriters += getFlattenedColumnsCount(listInsp.getListElementObjectInspector()); + break; + case UNION: + numWriters += 1; + UnionObjectInspector unionInsp = (UnionObjectInspector) inspector; + List<ObjectInspector> choices = unionInsp.getObjectInspectors(); + for (int i = 0; i < choices.size(); ++i) { + numWriters += getFlattenedColumnsCount(choices.get(i)); + } + break; + default: + throw new IllegalArgumentException("Bad category: " + + inspector.getCategory()); + } + return numWriters; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java new file mode 100644 index 0000000..f6cfd57 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +import java.io.IOException; +import java.nio.ByteBuffer; + +class OutStream extends PositionedOutputStream { + + interface OutputReceiver { + /** + * Output the given buffer to the final destination + * @param buffer the buffer to output + * @throws IOException + */ + void output(ByteBuffer buffer) throws IOException; + } + + static final int HEADER_SIZE = 3; + private final String name; + private final OutputReceiver receiver; + // if enabled the stream will be suppressed when writing stripe + private boolean suppress; + + /** + * Stores the uncompressed bytes that have been serialized, but not + * compressed yet. When this fills, we compress the entire buffer. + */ + private ByteBuffer current = null; + + /** + * Stores the compressed bytes until we have a full buffer and then outputs + * them to the receiver. If no compression is being done, this (and overflow) + * will always be null and the current buffer will be sent directly to the + * receiver. + */ + private ByteBuffer compressed = null; + + /** + * Since the compressed buffer may start with contents from previous + * compression blocks, we allocate an overflow buffer so that the + * output of the codec can be split between the two buffers. After the + * compressed buffer is sent to the receiver, the overflow buffer becomes + * the new compressed buffer. + */ + private ByteBuffer overflow = null; + private final int bufferSize; + private final CompressionCodec codec; + private long compressedBytes = 0; + private long uncompressedBytes = 0; + + OutStream(String name, + int bufferSize, + CompressionCodec codec, + OutputReceiver receiver) throws IOException { + this.name = name; + this.bufferSize = bufferSize; + this.codec = codec; + this.receiver = receiver; + this.suppress = false; + } + + public void clear() throws IOException { + flush(); + suppress = false; + } + + /** + * Write the length of the compressed bytes. Life is much easier if the + * header is constant length, so just use 3 bytes. Considering most of the + * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should + * be plenty. We also use the low bit for whether it is the original or + * compressed bytes. + * @param buffer the buffer to write the header to + * @param position the position in the buffer to write at + * @param val the size in the file + * @param original is it uncompressed + */ + private static void writeHeader(ByteBuffer buffer, + int position, + int val, + boolean original) { + buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0))); + buffer.put(position + 1, (byte) (val >> 7)); + buffer.put(position + 2, (byte) (val >> 15)); + } + + private void getNewInputBuffer() throws IOException { + if (codec == null) { + current = ByteBuffer.allocate(bufferSize); + } else { + current = ByteBuffer.allocate(bufferSize + HEADER_SIZE); + writeHeader(current, 0, bufferSize, true); + current.position(HEADER_SIZE); + } + } + + /** + * Allocate a new output buffer if we are compressing. + */ + private ByteBuffer getNewOutputBuffer() throws IOException { + return ByteBuffer.allocate(bufferSize + HEADER_SIZE); + } + + private void flip() throws IOException { + current.limit(current.position()); + current.position(codec == null ? 0 : HEADER_SIZE); + } + + @Override + public void write(int i) throws IOException { + if (current == null) { + getNewInputBuffer(); + } + if (current.remaining() < 1) { + spill(); + } + uncompressedBytes += 1; + current.put((byte) i); + } + + @Override + public void write(byte[] bytes, int offset, int length) throws IOException { + if (current == null) { + getNewInputBuffer(); + } + int remaining = Math.min(current.remaining(), length); + current.put(bytes, offset, remaining); + uncompressedBytes += remaining; + length -= remaining; + while (length != 0) { + spill(); + offset += remaining; + remaining = Math.min(current.remaining(), length); + current.put(bytes, offset, remaining); + uncompressedBytes += remaining; + length -= remaining; + } + } + + private void spill() throws IOException { + // if there isn't anything in the current buffer, don't spill + if (current == null || + current.position() == (codec == null ? 0 : HEADER_SIZE)) { + return; + } + flip(); + if (codec == null) { + receiver.output(current); + getNewInputBuffer(); + } else { + if (compressed == null) { + compressed = getNewOutputBuffer(); + } else if (overflow == null) { + overflow = getNewOutputBuffer(); + } + int sizePosn = compressed.position(); + compressed.position(compressed.position() + HEADER_SIZE); + if (codec.compress(current, compressed, overflow)) { + uncompressedBytes = 0; + // move position back to after the header + current.position(HEADER_SIZE); + current.limit(current.capacity()); + // find the total bytes in the chunk + int totalBytes = compressed.position() - sizePosn - HEADER_SIZE; + if (overflow != null) { + totalBytes += overflow.position(); + } + compressedBytes += totalBytes + HEADER_SIZE; + writeHeader(compressed, sizePosn, totalBytes, false); + // if we have less than the next header left, spill it. + if (compressed.remaining() < HEADER_SIZE) { + compressed.flip(); + receiver.output(compressed); + compressed = overflow; + overflow = null; + } + } else { + compressedBytes += uncompressedBytes + HEADER_SIZE; + uncompressedBytes = 0; + // we are using the original, but need to spill the current + // compressed buffer first. So back up to where we started, + // flip it and add it to done. + if (sizePosn != 0) { + compressed.position(sizePosn); + compressed.flip(); + receiver.output(compressed); + compressed = null; + // if we have an overflow, clear it and make it the new compress + // buffer + if (overflow != null) { + overflow.clear(); + compressed = overflow; + overflow = null; + } + } else { + compressed.clear(); + if (overflow != null) { + overflow.clear(); + } + } + + // now add the current buffer into the done list and get a new one. + current.position(0); + // update the header with the current length + writeHeader(current, 0, current.limit() - HEADER_SIZE, true); + receiver.output(current); + getNewInputBuffer(); + } + } + } + + void getPosition(PositionRecorder recorder) throws IOException { + if (codec == null) { + recorder.addPosition(uncompressedBytes); + } else { + recorder.addPosition(compressedBytes); + recorder.addPosition(uncompressedBytes); + } + } + + @Override + public void flush() throws IOException { + spill(); + if (compressed != null && compressed.position() != 0) { + compressed.flip(); + receiver.output(compressed); + compressed = null; + } + uncompressedBytes = 0; + compressedBytes = 0; + overflow = null; + current = null; + } + + @Override + public String toString() { + return name; + } + + @Override + public long getBufferSize() { + long result = 0; + if (current != null) { + result += current.capacity(); + } + if (compressed != null) { + result += compressed.capacity(); + } + if (overflow != null) { + result += overflow.capacity(); + } + return result; + } + + /** + * Set suppress flag + */ + public void suppress() { + suppress = true; + } + + /** + * Returns the state of suppress flag + * @return value of suppress flag + */ + public boolean isSuppressed() { + return suppress; + } +} + http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java new file mode 100644 index 0000000..a39926e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +/** + * An interface for recording positions in a stream. + */ +interface PositionRecorder { + void addPosition(long offset); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java new file mode 100644 index 0000000..748c98c --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +import java.io.IOException; +import java.io.OutputStream; + +abstract class PositionedOutputStream extends OutputStream { + + /** + * Record the current position to the recorder. + * @param recorder the object that receives the position + * @throws IOException + */ + abstract void getPosition(PositionRecorder recorder) throws IOException; + + /** + * Get the memory size currently allocated as buffer associated with this + * stream. + * @return the number of bytes used by buffers. + */ + abstract long getBufferSize(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java new file mode 100644 index 0000000..2482f93 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +/** + * A memory efficient red-black tree that does not allocate any objects per + * an element. This class is abstract and assumes that the child class + * handles the key and comparisons with the key. + */ +abstract class RedBlackTree { + public static final int NULL = -1; + + // Various values controlling the offset of the data within the array. + private static final int LEFT_OFFSET = 0; + private static final int RIGHT_OFFSET = 1; + private static final int ELEMENT_SIZE = 2; + + protected int size = 0; + private final DynamicIntArray data; + protected int root = NULL; + protected int lastAdd = 0; + private boolean wasAdd = false; + + /** + * Create a set with the given initial capacity. + */ + public RedBlackTree(int initialCapacity) { + data = new DynamicIntArray(initialCapacity * ELEMENT_SIZE); + } + + /** + * Insert a new node into the data array, growing the array as necessary. + * + * @return Returns the position of the new node. + */ + private int insert(int left, int right, boolean isRed) { + int position = size; + size += 1; + setLeft(position, left, isRed); + setRight(position, right); + return position; + } + + /** + * Compare the value at the given position to the new value. + * @return 0 if the values are the same, -1 if the new value is smaller and + * 1 if the new value is larger. + */ + protected abstract int compareValue(int position); + + /** + * Is the given node red as opposed to black? To prevent having an extra word + * in the data array, we just the low bit on the left child index. + */ + protected boolean isRed(int position) { + return position != NULL && + (data.get(position * ELEMENT_SIZE + LEFT_OFFSET) & 1) == 1; + } + + /** + * Set the red bit true or false. + */ + private void setRed(int position, boolean isRed) { + int offset = position * ELEMENT_SIZE + LEFT_OFFSET; + if (isRed) { + data.set(offset, data.get(offset) | 1); + } else { + data.set(offset, data.get(offset) & ~1); + } + } + + /** + * Get the left field of the given position. + */ + protected int getLeft(int position) { + return data.get(position * ELEMENT_SIZE + LEFT_OFFSET) >> 1; + } + + /** + * Get the right field of the given position. + */ + protected int getRight(int position) { + return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET); + } + + /** + * Set the left field of the given position. + * Note that we are storing the node color in the low bit of the left pointer. + */ + private void setLeft(int position, int left) { + int offset = position * ELEMENT_SIZE + LEFT_OFFSET; + data.set(offset, (left << 1) | (data.get(offset) & 1)); + } + + /** + * Set the left field of the given position. + * Note that we are storing the node color in the low bit of the left pointer. + */ + private void setLeft(int position, int left, boolean isRed) { + int offset = position * ELEMENT_SIZE + LEFT_OFFSET; + data.set(offset, (left << 1) | (isRed ? 1 : 0)); + } + + /** + * Set the right field of the given position. + */ + private void setRight(int position, int right) { + data.set(position * ELEMENT_SIZE + RIGHT_OFFSET, right); + } + + /** + * Insert or find a given key in the tree and rebalance the tree correctly. + * Rebalancing restores the red-black aspect of the tree to maintain the + * invariants: + * 1. If a node is red, both of its children are black. + * 2. Each child of a node has the same black height (the number of black + * nodes between it and the leaves of the tree). + * + * Inserted nodes are at the leaves and are red, therefore there is at most a + * violation of rule 1 at the node we just put in. Instead of always keeping + * the parents, this routine passing down the context. + * + * The fix is broken down into 6 cases (1.{1,2,3} and 2.{1,2,3} that are + * left-right mirror images of each other). See Algorighms by Cormen, + * Leiserson, and Rivest for the explaination of the subcases. + * + * @param node The node that we are fixing right now. + * @param fromLeft Did we come down from the left? + * @param parent Nodes' parent + * @param grandparent Parent's parent + * @param greatGrandparent Grandparent's parent + * @return Does parent also need to be checked and/or fixed? + */ + private boolean add(int node, boolean fromLeft, int parent, + int grandparent, int greatGrandparent) { + if (node == NULL) { + if (root == NULL) { + lastAdd = insert(NULL, NULL, false); + root = lastAdd; + wasAdd = true; + return false; + } else { + lastAdd = insert(NULL, NULL, true); + node = lastAdd; + wasAdd = true; + // connect the new node into the tree + if (fromLeft) { + setLeft(parent, node); + } else { + setRight(parent, node); + } + } + } else { + int compare = compareValue(node); + boolean keepGoing; + + // Recurse down to find where the node needs to be added + if (compare < 0) { + keepGoing = add(getLeft(node), true, node, parent, grandparent); + } else if (compare > 0) { + keepGoing = add(getRight(node), false, node, parent, grandparent); + } else { + lastAdd = node; + wasAdd = false; + return false; + } + + // we don't need to fix the root (because it is always set to black) + if (node == root || !keepGoing) { + return false; + } + } + + + // Do we need to fix this node? Only if there are two reds right under each + // other. + if (isRed(node) && isRed(parent)) { + if (parent == getLeft(grandparent)) { + int uncle = getRight(grandparent); + if (isRed(uncle)) { + // case 1.1 + setRed(parent, false); + setRed(uncle, false); + setRed(grandparent, true); + return true; + } else { + if (node == getRight(parent)) { + // case 1.2 + // swap node and parent + int tmp = node; + node = parent; + parent = tmp; + // left-rotate on node + setLeft(grandparent, parent); + setRight(node, getLeft(parent)); + setLeft(parent, node); + } + + // case 1.2 and 1.3 + setRed(parent, false); + setRed(grandparent, true); + + // right-rotate on grandparent + if (greatGrandparent == NULL) { + root = parent; + } else if (getLeft(greatGrandparent) == grandparent) { + setLeft(greatGrandparent, parent); + } else { + setRight(greatGrandparent, parent); + } + setLeft(grandparent, getRight(parent)); + setRight(parent, grandparent); + return false; + } + } else { + int uncle = getLeft(grandparent); + if (isRed(uncle)) { + // case 2.1 + setRed(parent, false); + setRed(uncle, false); + setRed(grandparent, true); + return true; + } else { + if (node == getLeft(parent)) { + // case 2.2 + // swap node and parent + int tmp = node; + node = parent; + parent = tmp; + // right-rotate on node + setRight(grandparent, parent); + setLeft(node, getRight(parent)); + setRight(parent, node); + } + // case 2.2 and 2.3 + setRed(parent, false); + setRed(grandparent, true); + // left-rotate on grandparent + if (greatGrandparent == NULL) { + root = parent; + } else if (getRight(greatGrandparent) == grandparent) { + setRight(greatGrandparent, parent); + } else { + setLeft(greatGrandparent, parent); + } + setRight(grandparent, getLeft(parent)); + setLeft(parent, grandparent); + return false; + } + } + } else { + return true; + } + } + + /** + * Add the new key to the tree. + * @return true if the element is a new one. + */ + protected boolean add() { + add(root, false, NULL, NULL, NULL); + if (wasAdd) { + setRed(root, false); + return true; + } else { + return false; + } + } + + /** + * Get the number of elements in the set. + */ + public int size() { + return size; + } + + /** + * Reset the table to empty. + */ + public void clear() { + root = NULL; + size = 0; + data.clear(); + } + + /** + * Get the buffer size in bytes. + */ + public long getSizeInBytes() { + return data.getSizeInBytes(); + } +} + http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java new file mode 100644 index 0000000..0953cdd --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +import java.io.IOException; + +/** + * A streamFactory that writes a sequence of bytes. A control byte is written before + * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the + * bytes is -1 to -128, 1 to 128 literal byte values follow. + */ +class RunLengthByteWriter { + static final int MIN_REPEAT_SIZE = 3; + static final int MAX_LITERAL_SIZE = 128; + static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE; + private final PositionedOutputStream output; + private final byte[] literals = new byte[MAX_LITERAL_SIZE]; + private int numLiterals = 0; + private boolean repeat = false; + private int tailRunLength = 0; + + RunLengthByteWriter(PositionedOutputStream output) { + this.output = output; + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + if (repeat) { + output.write(numLiterals - MIN_REPEAT_SIZE); + output.write(literals, 0, 1); + } else { + output.write(-numLiterals); + output.write(literals, 0, numLiterals); + } + repeat = false; + tailRunLength = 0; + numLiterals = 0; + } + } + + void flush() throws IOException { + writeValues(); + output.flush(); + } + + void write(byte value) throws IOException { + if (numLiterals == 0) { + literals[numLiterals++] = value; + tailRunLength = 1; + } else if (repeat) { + if (value == literals[0]) { + numLiterals += 1; + if (numLiterals == MAX_REPEAT_SIZE) { + writeValues(); + } + } else { + writeValues(); + literals[numLiterals++] = value; + tailRunLength = 1; + } + } else { + if (value == literals[numLiterals - 1]) { + tailRunLength += 1; + } else { + tailRunLength = 1; + } + if (tailRunLength == MIN_REPEAT_SIZE) { + if (numLiterals + 1 == MIN_REPEAT_SIZE) { + repeat = true; + numLiterals += 1; + } else { + numLiterals -= MIN_REPEAT_SIZE - 1; + writeValues(); + literals[0] = value; + repeat = true; + numLiterals = MIN_REPEAT_SIZE; + } + } else { + literals[numLiterals++] = value; + if (numLiterals == MAX_LITERAL_SIZE) { + writeValues(); + } + } + } + } + + void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java new file mode 100644 index 0000000..867f041 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +import java.io.IOException; + +/** + * A streamFactory that writes a sequence of integers. A control byte is written before + * each run with positive values 0 to 127 meaning 3 to 130 repetitions, each + * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128 + * literal vint values follow. + */ +class RunLengthIntegerWriter implements IntegerWriter { + static final int MIN_REPEAT_SIZE = 3; + static final int MAX_DELTA = 127; + static final int MIN_DELTA = -128; + static final int MAX_LITERAL_SIZE = 128; + private static final int MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE; + private final PositionedOutputStream output; + private final boolean signed; + private final long[] literals = new long[MAX_LITERAL_SIZE]; + private int numLiterals = 0; + private long delta = 0; + private boolean repeat = false; + private int tailRunLength = 0; + private SerializationUtils utils; + + RunLengthIntegerWriter(PositionedOutputStream output, + boolean signed) { + this.output = output; + this.signed = signed; + this.utils = new SerializationUtils(); + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + if (repeat) { + output.write(numLiterals - MIN_REPEAT_SIZE); + output.write((byte) delta); + if (signed) { + utils.writeVslong(output, literals[0]); + } else { + utils.writeVulong(output, literals[0]); + } + } else { + output.write(-numLiterals); + for(int i=0; i < numLiterals; ++i) { + if (signed) { + utils.writeVslong(output, literals[i]); + } else { + utils.writeVulong(output, literals[i]); + } + } + } + repeat = false; + numLiterals = 0; + tailRunLength = 0; + } + } + + @Override + public void flush() throws IOException { + writeValues(); + output.flush(); + } + + @Override + public void write(long value) throws IOException { + if (numLiterals == 0) { + literals[numLiterals++] = value; + tailRunLength = 1; + } else if (repeat) { + if (value == literals[0] + delta * numLiterals) { + numLiterals += 1; + if (numLiterals == MAX_REPEAT_SIZE) { + writeValues(); + } + } else { + writeValues(); + literals[numLiterals++] = value; + tailRunLength = 1; + } + } else { + if (tailRunLength == 1) { + delta = value - literals[numLiterals - 1]; + if (delta < MIN_DELTA || delta > MAX_DELTA) { + tailRunLength = 1; + } else { + tailRunLength = 2; + } + } else if (value == literals[numLiterals - 1] + delta) { + tailRunLength += 1; + } else { + delta = value - literals[numLiterals - 1]; + if (delta < MIN_DELTA || delta > MAX_DELTA) { + tailRunLength = 1; + } else { + tailRunLength = 2; + } + } + if (tailRunLength == MIN_REPEAT_SIZE) { + if (numLiterals + 1 == MIN_REPEAT_SIZE) { + repeat = true; + numLiterals += 1; + } else { + numLiterals -= MIN_REPEAT_SIZE - 1; + long base = literals[numLiterals]; + writeValues(); + literals[0] = base; + repeat = true; + numLiterals = MIN_REPEAT_SIZE; + } + } else { + literals[numLiterals++] = value; + if (numLiterals == MAX_LITERAL_SIZE) { + writeValues(); + } + } + } + } + + @Override + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } + +}
