Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/652#discussion_r35528682
  
    --- Diff: 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/WriterImpl.java
 ---
    @@ -0,0 +1,2251 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.storage.thirdparty.orc;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Lists;
    +import com.google.common.primitives.Longs;
    +import com.google.protobuf.ByteString;
    +import com.google.protobuf.CodedOutputStream;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataOutputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.io.IOConstants;
    +import org.apache.hadoop.hive.serde2.io.DateWritable;
    +import org.apache.hadoop.hive.shims.ShimLoader;
    +import org.apache.tajo.datum.*;
    +import org.apache.tajo.storage.Tuple;
    +import org.apache.tajo.storage.thirdparty.orc.CompressionCodec.Modifier;
    +import org.apache.tajo.storage.thirdparty.orc.OrcProto.RowIndexEntry;
    +import org.apache.tajo.storage.thirdparty.orc.OrcProto.StripeStatistics;
    +import org.apache.tajo.storage.thirdparty.orc.OrcProto.Type;
    +import org.apache.tajo.storage.thirdparty.orc.OrcProto.UserMetadataItem;
    +import org.apache.hadoop.hive.ql.util.JavaDataModel;
    +import org.apache.hadoop.hive.serde2.objectinspector.*;
    +import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
    +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
    +import org.apache.hadoop.io.Text;
    +import org.apache.tajo.util.datetime.DateTimeUtil;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.lang.management.ManagementFactory;
    +import java.nio.ByteBuffer;
    +import java.sql.Timestamp;
    +import java.util.*;
    +
    +import static com.google.common.base.Preconditions.checkArgument;
    +
    +/**
    + * An ORC file writer. The file is divided into stripes, which is the 
natural
    + * unit of work when reading. Each stripe is buffered in memory until the
    + * memory reaches the stripe size and then it is written out broken down by
    + * columns. Each column is written by a TreeWriter that is specific to that
    + * type of column. TreeWriters may have children TreeWriters that handle 
the
    + * sub-types. Each of the TreeWriters writes the column's data as a set of
    + * streams.
    + *
    + * This class is unsynchronized like most Stream objects, so from the 
creation of an OrcFile and all
    + * access to a single instance has to be from a single thread.
    + *
    + * There are no known cases where these happen between different threads 
today.
    + *
    + * Caveat: the MemoryManager is created during WriterOptions create, that 
has to be confined to a single
    + * thread as well.
    + *
    + */
    +public class WriterImpl implements Writer, MemoryManager.Callback {
    +
    +  private static final Log LOG = LogFactory.getLog(WriterImpl.class);
    +
    +  private static final int HDFS_BUFFER_SIZE = 256 * 1024;
    +  private static final int MIN_ROW_INDEX_STRIDE = 1000;
    +
    +  // threshold above which buffer size will be automatically resized
    +  private static final int COLUMN_COUNT_THRESHOLD = 1000;
    +
    +  private final FileSystem fs;
    +  private final Path path;
    +  private final long defaultStripeSize;
    +  private long adjustedStripeSize;
    +  private final int rowIndexStride;
    +  private final CompressionKind compress;
    +  private final CompressionCodec codec;
    +  private final boolean addBlockPadding;
    +  private final int bufferSize;
    +  private final long blockSize;
    +  private final float paddingTolerance;
    +  // the streams that make up the current stripe
    +  private final Map<StreamName, BufferedStream> streams =
    +    new TreeMap<StreamName, BufferedStream>();
    +
    +  private FSDataOutputStream rawWriter = null;
    +  // the compressed metadata information outStream
    +  private OutStream writer = null;
    +  // a protobuf outStream around streamFactory
    +  private CodedOutputStream protobufWriter = null;
    +  private long headerLength;
    +  private int columnCount;
    +  private long rowCount = 0;
    +  private long rowsInStripe = 0;
    +  private long rawDataSize = 0;
    +  private int rowsInIndex = 0;
    +  private int stripesAtLastFlush = -1;
    +  private final List<OrcProto.StripeInformation> stripes =
    +    new ArrayList<OrcProto.StripeInformation>();
    +  private final Map<String, ByteString> userMetadata =
    +    new TreeMap<String, ByteString>();
    +  private final TreeWriter treeWriter;
    +  private final boolean buildIndex;
    +  private final MemoryManager memoryManager;
    +  private final OrcFile.Version version;
    +  private final Configuration conf;
    +  private final OrcFile.WriterCallback callback;
    +  private final OrcFile.WriterContext callbackContext;
    +  private final OrcFile.EncodingStrategy encodingStrategy;
    +  private final OrcFile.CompressionStrategy compressionStrategy;
    +  private final boolean[] bloomFilterColumns;
    +  private final double bloomFilterFpp;
    +  private boolean writeTimeZone;
    +
    +  WriterImpl(FileSystem fs,
    +      Path path,
    +      Configuration conf,
    +      ObjectInspector inspector,
    +      long stripeSize,
    +      CompressionKind compress,
    +      int bufferSize,
    +      int rowIndexStride,
    +      MemoryManager memoryManager,
    +      boolean addBlockPadding,
    +      OrcFile.Version version,
    +      OrcFile.WriterCallback callback,
    +      OrcFile.EncodingStrategy encodingStrategy,
    +      OrcFile.CompressionStrategy compressionStrategy,
    +      float paddingTolerance,
    +      long blockSizeValue,
    +      String bloomFilterColumnNames,
    +      double bloomFilterFpp) throws IOException {
    +    this.fs = fs;
    +    this.path = path;
    +    this.conf = conf;
    +    this.callback = callback;
    +    if (callback != null) {
    +      callbackContext = new OrcFile.WriterContext(){
    +
    +        @Override
    +        public Writer getWriter() {
    +          return WriterImpl.this;
    +        }
    +      };
    +    } else {
    +      callbackContext = null;
    +    }
    +    this.adjustedStripeSize = stripeSize;
    +    this.defaultStripeSize = stripeSize;
    +    this.version = version;
    +    this.encodingStrategy = encodingStrategy;
    +    this.compressionStrategy = compressionStrategy;
    +    this.addBlockPadding = addBlockPadding;
    +    this.blockSize = blockSizeValue;
    +    this.paddingTolerance = paddingTolerance;
    +    this.compress = compress;
    +    this.rowIndexStride = rowIndexStride;
    +    this.memoryManager = memoryManager;
    +    buildIndex = rowIndexStride > 0;
    +    codec = createCodec(compress);
    +    String allColumns = conf.get(IOConstants.COLUMNS);
    +    if (allColumns == null) {
    +      allColumns = getColumnNamesFromInspector(inspector);
    +    }
    +    this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize);
    +    if (version == OrcFile.Version.V_0_11) {
    +      /* do not write bloom filters for ORC v11 */
    +      this.bloomFilterColumns =
    +          OrcUtils.includeColumns(null, allColumns, inspector);
    +    } else {
    +      this.bloomFilterColumns =
    +          OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, 
inspector);
    +    }
    +    this.bloomFilterFpp = bloomFilterFpp;
    +    treeWriter = createTreeWriter(inspector, new StreamFactory(), false);
    +    if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
    +      throw new IllegalArgumentException("Row stride must be at least " +
    +          MIN_ROW_INDEX_STRIDE);
    +    }
    +
    +    // ensure that we are able to handle callbacks before we register 
ourselves
    +    memoryManager.addWriter(path, stripeSize, this);
    +  }
    +
    +  private String getColumnNamesFromInspector(ObjectInspector inspector) {
    +    List<String> fieldNames = Lists.newArrayList();
    +    Joiner joiner = Joiner.on(",");
    +    if (inspector instanceof StructObjectInspector) {
    +      StructObjectInspector soi = (StructObjectInspector) inspector;
    +      List<? extends StructField> fields = soi.getAllStructFieldRefs();
    +      for(StructField sf : fields) {
    +        fieldNames.add(sf.getFieldName());
    +      }
    +    }
    +    return joiner.join(fieldNames);
    +  }
    +
    +  @VisibleForTesting
    +  int getEstimatedBufferSize(int bs) {
    +      return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs);
    +  }
    +
    +  int getEstimatedBufferSize(String colNames, int bs) {
    +    long availableMem = getMemoryAvailableForORC();
    +    if (colNames != null) {
    +      final int numCols = colNames.split(",").length;
    +      if (numCols > COLUMN_COUNT_THRESHOLD) {
    +        // In BufferedStream, there are 3 outstream buffers (compressed,
    +        // uncompressed and overflow) and list of previously compressed 
buffers.
    +        // Since overflow buffer is rarely used, lets consider only 2 
allocation.
    +        // Also, initially, the list of compression buffers will be empty.
    +        final int outStreamBuffers = codec == null ? 1 : 2;
    +
    +        // max possible streams per column is 5. For string columns, there 
is
    +        // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams.
    +        final int maxStreams = 5;
    +
    +        // Lets assume 10% memory for holding dictionary in memory and 
other
    +        // object allocations
    +        final long miscAllocation = (long) (0.1f * availableMem);
    +
    +        // compute the available memory
    +        final long remainingMem = availableMem - miscAllocation;
    +
    +        int estBufferSize = (int) (remainingMem /
    +            (maxStreams * outStreamBuffers * numCols));
    +        estBufferSize = getClosestBufferSize(estBufferSize, bs);
    +        if (estBufferSize > bs) {
    +          estBufferSize = bs;
    +        }
    +
    +        LOG.info("WIDE TABLE - Number of columns: " + numCols +
    +            " Chosen compression buffer size: " + estBufferSize);
    +        return estBufferSize;
    +      }
    +    }
    +    return bs;
    +  }
    +
    +  private int getClosestBufferSize(int estBufferSize, int bs) {
    +    final int kb4 = 4 * 1024;
    +    final int kb8 = 8 * 1024;
    +    final int kb16 = 16 * 1024;
    +    final int kb32 = 32 * 1024;
    +    final int kb64 = 64 * 1024;
    +    final int kb128 = 128 * 1024;
    +    final int kb256 = 256 * 1024;
    +    if (estBufferSize <= kb4) {
    +      return kb4;
    +    } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
    +      return kb8;
    +    } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
    +      return kb16;
    +    } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
    +      return kb32;
    +    } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
    +      return kb64;
    +    } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
    +      return kb128;
    +    } else {
    +      return kb256;
    +    }
    +  }
    +
    +  // the assumption is only one ORC writer open at a time, which holds 
true for
    +  // most of the cases. HIVE-6455 forces single writer case.
    +  private long getMemoryAvailableForORC() {
    +    OrcConf.ConfVars poolVar = OrcConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL;
    +    double maxLoad = conf.getFloat(poolVar.varname, 
poolVar.defaultFloatVal);
    +    long totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
    +        getHeapMemoryUsage().getMax() * maxLoad);
    +    return totalMemoryPool;
    +  }
    +
    +  public static CompressionCodec createCodec(CompressionKind kind) {
    +    switch (kind) {
    +      case NONE:
    +        return null;
    +      case ZLIB:
    +        return new ZlibCodec();
    +      case SNAPPY:
    +        return new SnappyCodec();
    +      case LZO:
    +        try {
    +          Class<? extends CompressionCodec> lzo =
    +              (Class<? extends CompressionCodec>)
    +                  
Class.forName("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
    +          return lzo.newInstance();
    +        } catch (ClassNotFoundException e) {
    +          throw new IllegalArgumentException("LZO is not available.", e);
    +        } catch (InstantiationException e) {
    +          throw new IllegalArgumentException("Problem initializing LZO", 
e);
    +        } catch (IllegalAccessException e) {
    +          throw new IllegalArgumentException("Insufficient access to LZO", 
e);
    +        }
    +      default:
    +        throw new IllegalArgumentException("Unknown compression codec: " +
    +            kind);
    +    }
    +  }
    +
    +  @Override
    +  public boolean checkMemory(double newScale) throws IOException {
    +    long limit = (long) Math.round(adjustedStripeSize * newScale);
    +    long size = estimateStripeSize();
    +    if (LOG.isDebugEnabled()) {
    +      LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
    +                limit);
    +    }
    +    if (size > limit) {
    +      flushStripe();
    +      return true;
    +    }
    +    return false;
    +  }
    +
    +  /**
    +   * This class is used to hold the contents of streams as they are 
buffered.
    +   * The TreeWriters write to the outStream and the codec compresses the
    +   * data as buffers fill up and stores them in the output list. When the
    +   * stripe is being written, the whole stream is written to the file.
    +   */
    +  private class BufferedStream implements OutStream.OutputReceiver {
    +    private final OutStream outStream;
    +    private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
    +
    +    BufferedStream(String name, int bufferSize,
    +                   CompressionCodec codec) throws IOException {
    +      outStream = new OutStream(name, bufferSize, codec, this);
    +    }
    +
    +    /**
    +     * Receive a buffer from the compression codec.
    +     * @param buffer the buffer to save
    +     * @throws IOException
    +     */
    +    @Override
    +    public void output(ByteBuffer buffer) {
    +      output.add(buffer);
    +    }
    +
    +    /**
    +     * Get the number of bytes in buffers that are allocated to this 
stream.
    +     * @return number of bytes in buffers
    +     */
    +    public long getBufferSize() {
    +      long result = 0;
    +      for(ByteBuffer buf: output) {
    +        result += buf.capacity();
    +      }
    +      return outStream.getBufferSize() + result;
    +    }
    +
    +    /**
    +     * Flush the stream to the codec.
    +     * @throws IOException
    +     */
    +    public void flush() throws IOException {
    +      outStream.flush();
    +    }
    +
    +    /**
    +     * Clear all of the buffers.
    +     * @throws IOException
    +     */
    +    public void clear() throws IOException {
    +      outStream.clear();
    +      output.clear();
    +    }
    +
    +    /**
    +     * Check the state of suppress flag in output stream
    +     * @return value of suppress flag
    +     */
    +    public boolean isSuppressed() {
    +      return outStream.isSuppressed();
    +    }
    +
    +    /**
    +     * Get the number of bytes that will be written to the output. Assumes
    +     * the stream has already been flushed.
    +     * @return the number of bytes
    +     */
    +    public long getOutputSize() {
    +      long result = 0;
    +      for(ByteBuffer buffer: output) {
    +        result += buffer.remaining();
    +      }
    +      return result;
    +    }
    +
    +    /**
    +     * Write the saved compressed buffers to the OutputStream.
    +     * @param out the stream to write to
    +     * @throws IOException
    +     */
    +    void spillTo(OutputStream out) throws IOException {
    +      for(ByteBuffer buffer: output) {
    +        out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
    +          buffer.remaining());
    +      }
    +    }
    +
    +    @Override
    +    public String toString() {
    +      return outStream.toString();
    +    }
    +  }
    +
    +  /**
    +   * An output receiver that writes the ByteBuffers to the output stream
    +   * as they are received.
    +   */
    +  private class DirectStream implements OutStream.OutputReceiver {
    +    private final FSDataOutputStream output;
    +
    +    DirectStream(FSDataOutputStream output) {
    +      this.output = output;
    +    }
    +
    +    @Override
    +    public void output(ByteBuffer buffer) throws IOException {
    +      output.write(buffer.array(), buffer.arrayOffset() + 
buffer.position(),
    +        buffer.remaining());
    +    }
    +  }
    +
    +  private static class RowIndexPositionRecorder implements 
PositionRecorder {
    +    private final OrcProto.RowIndexEntry.Builder builder;
    +
    +    RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
    +      this.builder = builder;
    +    }
    +
    +    @Override
    +    public void addPosition(long position) {
    +      builder.addPositions(position);
    +    }
    +  }
    +
    +  /**
    +   * Interface from the Writer to the TreeWriters. This limits the 
visibility
    +   * that the TreeWriters have into the Writer.
    +   */
    +  private class StreamFactory {
    +    /**
    +     * Create a stream to store part of a column.
    +     * @param column the column id for the stream
    +     * @param kind the kind of stream
    +     * @return The output outStream that the section needs to be written 
to.
    +     * @throws IOException
    +     */
    +    public OutStream createStream(int column,
    +                                  OrcProto.Stream.Kind kind
    +                                  ) throws IOException {
    +      final StreamName name = new StreamName(column, kind);
    +      final EnumSet<CompressionCodec.Modifier> modifiers;
    +
    +      switch (kind) {
    +        case BLOOM_FILTER:
    +        case DATA:
    +        case DICTIONARY_DATA:
    +          if (getCompressionStrategy() == 
OrcFile.CompressionStrategy.SPEED) {
    +            modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT);
    +          } else {
    +            modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT);
    +          }
    +          break;
    +        case LENGTH:
    +        case DICTIONARY_COUNT:
    +        case PRESENT:
    +        case ROW_INDEX:
    +        case SECONDARY:
    +          // easily compressed using the fastest modes
    +          modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY);
    +          break;
    +        default:
    +          LOG.warn("Missing ORC compression modifiers for " + kind);
    +          modifiers = null;
    +          break;
    +      }
    +
    +      BufferedStream result = streams.get(name);
    +      if (result == null) {
    +        result = new BufferedStream(name.toString(), bufferSize,
    +            codec == null ? codec : codec.modify(modifiers));
    +        streams.put(name, result);
    +      }
    +      return result.outStream;
    +    }
    +
    +    /**
    +     * Get the next column id.
    +     * @return a number from 0 to the number of columns - 1
    +     */
    +    public int getNextColumnId() {
    +      return columnCount++;
    +    }
    +
    +    /**
    +     * Get the current column id. After creating all tree writers this 
count should tell how many
    +     * columns (including columns within nested complex objects) are 
created in total.
    +     * @return current column id
    +     */
    +    public int getCurrentColumnId() {
    +      return columnCount;
    +    }
    +
    +    /**
    +     * Get the stride rate of the row index.
    +     */
    +    public int getRowIndexStride() {
    +      return rowIndexStride;
    +    }
    +
    +    /**
    +     * Should be building the row index.
    +     * @return true if we are building the index
    +     */
    +    public boolean buildIndex() {
    +      return buildIndex;
    +    }
    +
    +    /**
    +     * Is the ORC file compressed?
    +     * @return are the streams compressed
    +     */
    +    public boolean isCompressed() {
    +      return codec != null;
    +    }
    +
    +    /**
    +     * Get the encoding strategy to use.
    +     * @return encoding strategy
    +     */
    +    public OrcFile.EncodingStrategy getEncodingStrategy() {
    +      return encodingStrategy;
    +    }
    +
    +    /**
    +     * Get the compression strategy to use.
    +     * @return compression strategy
    +     */
    +    public OrcFile.CompressionStrategy getCompressionStrategy() {
    +      return compressionStrategy;
    +    }
    +
    +    /**
    +     * Get the bloom filter columns
    +     * @return bloom filter columns
    +     */
    +    public boolean[] getBloomFilterColumns() {
    +      return bloomFilterColumns;
    +    }
    +
    +    /**
    +     * Get bloom filter false positive percentage.
    +     * @return fpp
    +     */
    +    public double getBloomFilterFPP() {
    +      return bloomFilterFpp;
    +    }
    +
    +    /**
    +     * Get the writer's configuration.
    +     * @return configuration
    +     */
    +    public Configuration getConfiguration() {
    +      return conf;
    +    }
    +
    +    /**
    +     * Get the version of the file to write.
    +     */
    +    public OrcFile.Version getVersion() {
    +      return version;
    +    }
    +
    +    public void useWriterTimeZone(boolean val) {
    +      writeTimeZone = val;
    +    }
    +
    +    public boolean hasWriterTimeZone() {
    +      return writeTimeZone;
    +    }
    +  }
    +
    +  /**
    +   * The parent class of all of the writers for each column. Each column
    +   * is written by an instance of this class. The compound types (struct,
    +   * list, map, and union) have children tree writers that write the 
children
    +   * types.
    +   */
    +  private abstract static class TreeWriter {
    +    protected final int id;
    +    protected final ObjectInspector inspector;
    +    private final BitFieldWriter isPresent;
    +    private final boolean isCompressed;
    +    protected final ColumnStatisticsImpl indexStatistics;
    +    protected final ColumnStatisticsImpl stripeColStatistics;
    +    private final ColumnStatisticsImpl fileStatistics;
    +    protected TreeWriter[] childrenWriters;
    +    protected final RowIndexPositionRecorder rowIndexPosition;
    +    private final OrcProto.RowIndex.Builder rowIndex;
    +    private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
    +    private final PositionedOutputStream rowIndexStream;
    +    private final PositionedOutputStream bloomFilterStream;
    +    protected final BloomFilterIO bloomFilter;
    +    protected final boolean createBloomFilter;
    +    private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
    +    private final OrcProto.BloomFilter.Builder bloomFilterEntry;
    +    private boolean foundNulls;
    +    private OutStream isPresentOutStream;
    +    private final List<StripeStatistics.Builder> stripeStatsBuilders;
    +    private final StreamFactory streamFactory;
    +
    +    /**
    +     * Create a tree writer.
    +     * @param columnId the column id of the column to write
    +     * @param inspector the object inspector to use
    +     * @param streamFactory limited access to the Writer's data.
    +     * @param nullable can the value be null?
    +     * @throws IOException
    +     */
    +    TreeWriter(int columnId, ObjectInspector inspector,
    +               StreamFactory streamFactory,
    +               boolean nullable) throws IOException {
    +      this.streamFactory = streamFactory;
    +      this.isCompressed = streamFactory.isCompressed();
    +      this.id = columnId;
    +      this.inspector = inspector;
    +      if (nullable) {
    +        isPresentOutStream = streamFactory.createStream(id,
    +            OrcProto.Stream.Kind.PRESENT);
    +        isPresent = new BitFieldWriter(isPresentOutStream, 1);
    +      } else {
    +        isPresent = null;
    +      }
    +      this.foundNulls = false;
    +      createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
    +      indexStatistics = ColumnStatisticsImpl.create(inspector);
    +      stripeColStatistics = ColumnStatisticsImpl.create(inspector);
    +      fileStatistics = ColumnStatisticsImpl.create(inspector);
    +      childrenWriters = new TreeWriter[0];
    +      rowIndex = OrcProto.RowIndex.newBuilder();
    +      rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
    +      rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
    +      stripeStatsBuilders = Lists.newArrayList();
    +      if (streamFactory.buildIndex()) {
    +        rowIndexStream = streamFactory.createStream(id, 
OrcProto.Stream.Kind.ROW_INDEX);
    +      } else {
    +        rowIndexStream = null;
    +      }
    +      if (createBloomFilter) {
    +        bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
    +        bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
    +        bloomFilterStream = streamFactory.createStream(id, 
OrcProto.Stream.Kind.BLOOM_FILTER);
    +        bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
    +            streamFactory.getBloomFilterFPP());
    +      } else {
    +        bloomFilterEntry = null;
    +        bloomFilterIndex = null;
    +        bloomFilterStream = null;
    +        bloomFilter = null;
    +      }
    +    }
    +
    +    protected OrcProto.RowIndex.Builder getRowIndex() {
    +      return rowIndex;
    +    }
    +
    +    protected ColumnStatisticsImpl getStripeStatistics() {
    +      return stripeColStatistics;
    +    }
    +
    +    protected ColumnStatisticsImpl getFileStatistics() {
    +      return fileStatistics;
    +    }
    +
    +    protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
    +      return rowIndexEntry;
    +    }
    +
    +    IntegerWriter createIntegerWriter(PositionedOutputStream output,
    +                                      boolean signed, boolean isDirectV2,
    +                                      StreamFactory writer) {
    +      if (isDirectV2) {
    +        boolean alignedBitpacking = false;
    +        if 
(writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
    +          alignedBitpacking = true;
    +        }
    +        return new RunLengthIntegerWriterV2(output, signed, 
alignedBitpacking);
    +      } else {
    +        return new RunLengthIntegerWriter(output, signed);
    +      }
    +    }
    +
    +    boolean isNewWriteFormat(StreamFactory writer) {
    +      return writer.getVersion() != OrcFile.Version.V_0_11;
    +    }
    +
    +    /**
    +     * Add a new value to the column.
    +     * @param datum
    +     * @throws IOException
    +     */
    +    void write(Datum datum) throws IOException {
    +      if (datum != null && datum.isNotNull()) {
    +        indexStatistics.increment();
    +      } else {
    +        indexStatistics.setNull();
    +      }
    +      if (isPresent != null) {
    +        if(datum == null || datum.isNull()) {
    +          foundNulls = true;
    +          isPresent.write(0);
    +        }
    +        else {
    +          isPresent.write(1);
    +        }
    +      }
    +    }
    +
    +    void write(Tuple tuple) throws IOException {
    +      if (tuple != null) {
    +        indexStatistics.increment();
    +      } else {
    +        indexStatistics.setNull();
    +      }
    +      if (isPresent != null) {
    +        if (tuple == null) {
    +          foundNulls = true;
    +          isPresent.write(0);
    +        } else {
    +          isPresent.write(1);
    +        }
    +      }
    +    }
    +
    +    private void removeIsPresentPositions() {
    +      for(int i=0; i < rowIndex.getEntryCount(); ++i) {
    +        RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
    +        List<Long> positions = entry.getPositionsList();
    +        // bit streams use 3 positions if uncompressed, 4 if compressed
    +        positions = positions.subList(isCompressed ? 4 : 3, 
positions.size());
    +        entry.clearPositions();
    +        entry.addAllPositions(positions);
    +      }
    +    }
    +
    +    /**
    +     * Write the stripe out to the file.
    +     * @param builder the stripe footer that contains the information 
about the
    +     *                layout of the stripe. The TreeWriter is required to 
update
    +     *                the footer with its information.
    +     * @param requiredIndexEntries the number of index entries that are
    +     *                             required. this is to check to make sure 
the
    +     *                             row index is well formed.
    +     * @throws IOException
    +     */
    +    void writeStripe(OrcProto.StripeFooter.Builder builder,
    +                     int requiredIndexEntries) throws IOException {
    +      if (isPresent != null) {
    +        isPresent.flush();
    +
    +        // if no nulls are found in a stream, then suppress the stream
    +        if(!foundNulls) {
    +          isPresentOutStream.suppress();
    +          // since isPresent bitstream is suppressed, update the index to
    +          // remove the positions of the isPresent stream
    +          if (rowIndexStream != null) {
    +            removeIsPresentPositions();
    +          }
    +        }
    +      }
    +
    +      // merge stripe-level column statistics to file statistics and write 
it to
    +      // stripe statistics
    +      OrcProto.StripeStatistics.Builder stripeStatsBuilder = 
OrcProto.StripeStatistics.newBuilder();
    +      writeStripeStatistics(stripeStatsBuilder, this);
    +      stripeStatsBuilders.add(stripeStatsBuilder);
    +
    +      // reset the flag for next stripe
    +      foundNulls = false;
    +
    +      builder.addColumns(getEncoding());
    +      if (streamFactory.hasWriterTimeZone()) {
    --- End diff --
    
    This timezone policy may be different from those of Tajo. Could you check 
the timezone? The following codes would be helpful to this work.
    
    
https://github.com/apache/tajo/blob/master/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java#L56


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to