pgaref commented on a change in pull request #635:
URL: https://github.com/apache/orc/pull/635#discussion_r578392205



##########
File path: java/core/src/java/org/apache/orc/filter/OrcFilterContext.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.filter.MutableFilterContext;
+import org.apache.orc.TypeDescription;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * This defines the input for any filter operation.
+ * <p>
+ * It offers a convenience method for finding the column vector from a given 
name, that the filters
+ * can invoke to get access to the column vector.
+ */
+public class OrcFilterContext implements MutableFilterContext {
+
+  VectorizedRowBatch batch = null;
+  // Cache of field to ColumnVector, this is reset everytime the batch 
reference changes
+  private final Map<String, ColumnVector> vectors;
+  private final TypeDescription readSchema;
+
+  public OrcFilterContext(TypeDescription readSchema) {
+    this.readSchema = readSchema;
+    this.vectors = new HashMap<>();
+  }
+
+  public OrcFilterContext setBatch(@NotNull VectorizedRowBatch batch) {
+    if (batch != this.batch) {
+      this.batch = batch;
+      vectors.clear();
+    }
+    return this;
+  }
+
+  @Override
+  public void setFilterContext(boolean selectedInUse, int[] selected, int 
selectedSize) {
+    batch.setFilterContext(selectedInUse, selected, selectedSize);
+  }
+
+  @Override
+  public boolean validateSelected() {
+    return batch.validateSelected();
+  }
+
+  @Override
+  public int[] updateSelected(int i) {
+    return batch.updateSelected(i);
+  }
+
+  @Override
+  public void setSelectedInUse(boolean b) {
+    batch.setSelectedInUse(b);
+  }
+
+  @Override
+  public void setSelected(int[] ints) {
+    batch.setSelected(ints);
+  }
+
+  @Override
+  public void setSelectedSize(int i) {
+    batch.setSelectedSize(i);
+  }
+
+  @Override
+  public void reset() {
+    batch.reset();
+  }
+
+  @Override
+  public boolean isSelectedInUse() {
+    return batch.isSelectedInUse();
+  }
+
+  @Override
+  public int[] getSelected() {
+    return batch.getSelected();
+  }
+
+  @Override
+  public int getSelectedSize() {
+    return batch.getSelectedSize();
+  }
+
+  public ColumnVector[] getCols() {
+    return batch.cols;
+  }
+
+  /**
+   * Retrieves the column vector that matches the specified name. Allows 
support for nested struct
+   * references e.g. order.date where data is a field in a struct called order.
+   *
+   * @param name The column name whose vector should be retrieved
+   * @return The column vector
+   * @throws IllegalArgumentException if the field is not found or if the 
nested field is not part
+   *                                  of a struct
+   */
+  public ColumnVector findColumnVector(String name) {
+    if (!vectors.containsKey(name)) {
+      vectors.put(name, findVector(name));
+    }
+
+    return vectors.get(name);
+  }
+
+  private ColumnVector findVector(String name) {
+    String[] refs = name.split(SPLIT_ON_PERIOD);

Review comment:
       We already have Utility methods that can be used here, for example: 
ParserUtils.splitName
   Actually, RecordReaderImpl.findColumns does everything you need under the 
hood -- you just need to do the extra check for the Struct Type

##########
File path: java/core/src/java/org/apache/orc/filter/OrcFilterContext.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.filter;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.filter.MutableFilterContext;
+import org.apache.orc.TypeDescription;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * This defines the input for any filter operation.
+ * <p>
+ * It offers a convenience method for finding the column vector from a given 
name, that the filters
+ * can invoke to get access to the column vector.
+ */
+public class OrcFilterContext implements MutableFilterContext {

Review comment:
       In fact this is an extension of a VectorizedRowBatch with schema -- we 
cant conveniently extend VectorizedRowBatch but we should mention the above in 
the doc

##########
File path: java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
##########
@@ -245,7 +263,8 @@ protected static IntegerReader 
createIntegerReader(OrcProto.ColumnEncoding.Kind
       switch (kind) {
         case DIRECT_V2:
         case DICTIONARY_V2:
-          return new RunLengthIntegerReaderV2(in, signed, context == null ? 
false : context.isSkipCorrupt());
+          return new RunLengthIntegerReaderV2(in, signed,

Review comment:
       This change is also unrelated -- I would suggest having this part of the 
Reader changes. Will add more details on my review comment

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -1221,52 +1244,112 @@ private boolean advanceToNextRow(
   @Override
   public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
     try {
-      if (rowInStripe >= rowCountInStripe) {
-        currentStripe += 1;
-        if (currentStripe >= stripes.size()) {
-          batch.size = 0;
-          return false;
+      int batchSize;
+
+      // do...while is required to handle the case where the filter eliminates 
all rows in the
+      // batch
+      do {
+        if (rowInStripe >= rowCountInStripe) {
+          currentStripe += 1;
+          if (currentStripe >= stripes.size()) {
+            batch.size = 0;
+            return false;
+          }
+          // Read stripe in Memory
+          readStripe();
+          followRowInStripe = rowInStripe;
         }
-        // Read stripe in Memory
-        readStripe();
-      }
 
-      int batchSize = computeBatchSize(batch.getMaxSize());
-      rowInStripe += batchSize;
-      reader.setVectorColumnCount(batch.getDataColumnCount());
-      reader.nextBatch(batch, batchSize);
-      advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
-      // batch.size can be modified by filter so only batchSize can tell if we 
actually read rows
+        batchSize = computeBatchSize(batch.getMaxSize());
+        reader.setVectorColumnCount(batch.getDataColumnCount());
+        reader.nextBatch(batch, batchSize, readLevel);
+        if (readLevel == ReadLevel.LEAD && batch.size > 0) {
+          prepareFollowingStreams(rowInStripe, followRowInStripe);

Review comment:
       comment here please

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -83,17 +85,21 @@
   private final boolean[] fileIncluded;
   private final long rowIndexStride;
   private long rowInStripe = 0;
+  private long followRowInStripe = 0;
   private int currentStripe = -1;
   private long rowBaseInStripe = 0;
   private long rowCountInStripe = 0;
   private final BatchReader reader;
   private final OrcIndex indexes;
+  private final boolean[] rowIndexCols;

Review comment:
       This is still not 100% clear -- maybe rowIndexColsToRead?
   As everything evolves around ReadLevel lets make use of it

##########
File path: 
java/core/src/java/org/apache/orc/impl/reader/tree/StructBatchReader.java
##########
@@ -15,62 +15,80 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.orc.impl.reader.tree;
 
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.filter.OrcFilterContext;
 import org.apache.orc.impl.TreeReaderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Set;
 
 public class StructBatchReader extends BatchReader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StructBatchReader.class);
   // The reader context including row-filtering details
   private final TreeReaderFactory.Context context;
+  private final TreeReaderFactory.StructTreeReader structReader;
+  private final OrcFilterContext fc;
 
-  public StructBatchReader(TreeReaderFactory.StructTreeReader rowReader, 
TreeReaderFactory.Context context) {
+  public StructBatchReader(TypeReader rowReader, TreeReaderFactory.Context 
context) {
     super(rowReader);
     this.context = context;
+    this.fc = new 
OrcFilterContext(context.getSchemaEvolution().getReaderSchema());
+    if (rowReader instanceof TreeReaderFactory.StructTreeReader) {
+      structReader = (TreeReaderFactory.StructTreeReader) rowReader;
+    } else {
+      structReader = (TreeReaderFactory.StructTreeReader) 
((LevelTypeReader)rowReader).getReader();
+    }
   }
 
-  private void readBatchColumn(VectorizedRowBatch batch, TypeReader[] 
children, int batchSize, int index)
-      throws IOException {
+  private void readBatchColumn(VectorizedRowBatch batch,
+                               TypeReader[] children,
+                               int batchSize,
+                               int index,
+                               ReadLevel readLevel)
+    throws IOException {
     ColumnVector colVector = batch.cols[index];
     if (colVector != null) {
       colVector.reset();
       colVector.ensureSize(batchSize, false);
-      children[index].nextVector(colVector, null, batchSize, batch);
+      children[index].nextVector(colVector, null, batchSize, batch, readLevel);
     }
   }
 
   @Override
-  public void nextBatch(VectorizedRowBatch batch, int batchSize) throws 
IOException {
-    TypeReader[] children = ((TreeReaderFactory.StructTreeReader) 
rootType).fields;
-    // Early expand fields --> apply filter --> expand remaining fields
-    Set<Integer> earlyExpandCols = context.getColumnFilterIds();
+  public void nextBatch(VectorizedRowBatch batch, int batchSize, ReadLevel 
readLevel)
+    throws IOException {
+    nextBatchLevel(batch, batchSize, readLevel);
 
-    // Clear selected and early expand columns used in Filter
-    batch.selectedInUse = false;
-    for (int i = 0; i < children.length && !earlyExpandCols.isEmpty() &&
-        (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
-      if (earlyExpandCols.contains(children[i].getColumnId())) {
-        readBatchColumn(batch, children, batchSize, i);
+    if (readLevel == ReadLevel.LEAD) {
+      // Apply filter callback to reduce number of # rows selected for 
decoding in the next
+      // TreeReaders
+      if (this.context.getColumnFilterCallback() != null) {
+        this.context.getColumnFilterCallback().accept(fc.setBatch(batch));
       }
     }
-    // Since we are going to filter rows based on some column values set 
batch.size earlier here
-    batch.size = batchSize;
+  }
+
+  private void nextBatchLevel(VectorizedRowBatch batch, int batchSize, 
ReadLevel readLevel) throws IOException {
+    TypeReader[] children = structReader.fields;
 
-    // Apply filter callback to reduce number of # rows selected for decoding 
in the next TreeReaders
-    if (!earlyExpandCols.isEmpty() && this.context.getColumnFilterCallback() 
!= null) {
-      this.context.getColumnFilterCallback().accept(batch);
+    if (readLevel != ReadLevel.FOLLOW) {
+      // In case of FOLLOW we leave the selectedInUse untouched.
+      batch.selectedInUse = false;

Review comment:
       Is this the right place to reset batch.selectedInUse? I would still keep 
it as a single method

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -234,11 +246,17 @@ protected RecordReaderImpl(ReaderImpl fileReader,
         // If the column is not present in the file then this can be ignored 
from read.
         if (expandColId != -1) {
           filterColIds.add(expandColId);
+          rowIndexCols[expandColId] = true;
         }
       }
       LOG.info("Filter Columns: " + filterColIds);
+      this.readLevel = ReadLevel.LEAD;
+    } else {
+      this.readLevel = ReadLevel.ALL;

Review comment:
       Maybe its cleaner to make ReadLevel.ALL the default init value 
(non-final) and then change it here to READ only when filters are enabled?

##########
File path: java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
##########
@@ -68,14 +74,15 @@
   private String writerTimezone;
   private long currentStripeId;
   private long originalStripeId;
-  private Map<StreamName, StreamInformation> streams = new HashMap<>();
+  private final Map<StreamName, StreamInformation> streams = new HashMap<>();

Review comment:
       unrelated

##########
File path: java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
##########
@@ -363,11 +396,26 @@ private void addChunk(BufferChunkList list, 
StreamInformation stream,
         stream.firstChunk = chunk;
       }
       list.add(chunk);
+      stream.lastChunk = chunk;
       offset += thisLen;
       length -= thisLen;
     }
   }
 
+  private BufferChunkList addFollowChunk(long offset, long length) {

Review comment:
       Method never used? 

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -83,17 +85,24 @@
   private final boolean[] fileIncluded;
   private final long rowIndexStride;
   private long rowInStripe = 0;
+  private long followRowInStripe = 0;
   private int currentStripe = -1;
   private long rowBaseInStripe = 0;
   private long rowCountInStripe = 0;
   private final BatchReader reader;
   private final OrcIndex indexes;
+  // identifies the columns requiring row indexes
+  private final boolean[] rowIndexCols;
   private final SargApplier sargApp;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
   private final DataReader dataReader;
   private final int maxDiskRangeChunkLimit;
   private final StripePlanner planner;
+  // identifies the type of read: FULL (no filters), LEAD (filters are present)
+  private final ReadLevel readLevel;

Review comment:
       Seems there is a FOLLOW type as well..

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -748,11 +763,11 @@ private static TruthValue checkInBloomFilter(BloomFilter 
bf,
     TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
 
     if (predObj instanceof Long) {
-      if (bf.testLong(((Long) predObj).longValue())) {
+      if (bf.testLong((Long) predObj)) {

Review comment:
       All pred changes below seem unrelated?

##########
File path: java/core/src/java/org/apache/orc/impl/BufferChunkList.java
##########
@@ -22,6 +22,14 @@
  * Builds a list of buffer chunks
  */
 public class BufferChunkList {
+  public BufferChunk getHead() {

Review comment:
       Are these changes needed? addFollowChunk method that used Head is never 
called

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -257,8 +275,8 @@ protected RecordReaderImpl(ReaderImpl fileReader,
         new OrcProto.BloomFilterIndex[columns]);
 
     planner = new StripePlanner(evolution.getFileSchema(), encryption,
-        dataReader, writerVersion, ignoreNonUtf8BloomFilter,
-        maxDiskRangeChunkLimit);
+                                dataReader, writerVersion, 
ignoreNonUtf8BloomFilter,

Review comment:
       indentation seems a bit off?

##########
File path: java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
##########
@@ -2833,8 +2904,7 @@ public static TypeReader createTreeReader(TypeDescription 
readerType,
       case DATE:
         return new DateTreeReader(fileType.getId(), context);
       case DECIMAL:
-        if (version == OrcFile.Version.UNSTABLE_PRE_2_0 &&
-            fileType.getPrecision() <= 
TypeDescription.MAX_DECIMAL64_PRECISION){
+        if (isDecimalAsLong(version, fileType.getPrecision())){

Review comment:
       Unrelated change..separate PR?

##########
File path: java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
##########
@@ -402,10 +450,18 @@ private BufferChunkList planIndexReading(boolean[] 
bloomFilterColumns) {
    * data.
    * @return a list of merged disk ranges to read
    */
-  private BufferChunkList planDataReading() {
+  private BufferChunkList planDataReading(ReadLevel readLevel) {
     BufferChunkList result = new BufferChunkList();
     for(StreamInformation stream: dataStreams) {
-      addChunk(result, stream, stream.offset, stream.length);
+      if (filterColIds.isEmpty()

Review comment:
       indentation? 

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -1170,6 +1189,10 @@ private void advanceStripe() throws IOException {
     }
   }
 
+  private int computeRGIdx(long rowIdx) {

Review comment:
       Lets add doc here

##########
File path: java/core/src/java/org/apache/orc/impl/reader/tree/ReadLevel.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.reader.tree;
+
+public enum ReadLevel {

Review comment:
       Shall we make this an inner class of LevelTypeReader?

##########
File path: 
java/core/src/java/org/apache/orc/impl/reader/tree/LevelStructBatchReader.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.reader.tree;
+
+import org.apache.orc.impl.TreeReaderFactory;
+
+public class LevelStructBatchReader extends StructBatchReader {

Review comment:
       Shall we make this an inner class of StructBatchReader? 

##########
File path: java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.filter.OrcFilterContext;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Consumer;
+
+public class TestRowFilteringIOSkip {
+  private final static Logger LOG = 
LoggerFactory.getLogger(TestRowFilteringIOSkip.class);
+  private static final Path workDir = new 
Path(System.getProperty("test.tmp.dir",
+                                                                  "target" + 
File.separator + "test"

Review comment:
       indentation seems off in here as well

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
##########
@@ -545,7 +545,7 @@ public int hashCode() {
 
     private long currentGeneration = 0;
 
-    private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {

Review comment:
       I believe its the latter

##########
File path: java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
##########
@@ -2758,34 +2818,44 @@ public void checkEncoding(OrcProto.ColumnEncoding 
encoding) throws IOException {
     }
 
     @Override
-    public void startStripe(StripePlanner planner) throws IOException {
-      super.startStripe(planner);
+    public void startStripe(StripePlanner planner, ReadLevel readLevel) throws 
IOException {
+      super.startStripe(planner, readLevel);
       lengths = createIntegerReader(planner.getEncoding(columnId).getKind(),
           planner.getStream(new StreamName(columnId,
               OrcProto.Stream.Kind.LENGTH)), false, context);
       if (keyReader != null) {
-        keyReader.startStripe(planner);
+        keyReader.startStripe(planner, readLevel);
       }
       if (valueReader != null) {
-        valueReader.startStripe(planner);
+        valueReader.startStripe(planner, readLevel);
       }
     }
 
     @Override
-    public void skipRows(long items) throws IOException {
+    public void skipRows(long items, ReadLevel readLevel) throws IOException {
       items = countNonNulls(items);
       long childSkip = 0;
       for (long i = 0; i < items; ++i) {
         childSkip += lengths.next();
       }
-      keyReader.skipRows(childSkip);
-      valueReader.skipRows(childSkip);
+      keyReader.skipRows(childSkip, readLevel);
+      valueReader.skipRows(childSkip, readLevel);
     }
   }
 
   public static TypeReader createTreeReader(TypeDescription readerType,
-                                            Context context
-                                            ) throws IOException {
+                                            Context context) throws 
IOException {
+    TypeReader reader = createTreeReaderInternal(readerType, context);
+    if (context.getColumnFilterCallback() == null
+        || reader instanceof NullTreeReader) {

Review comment:
       indentation?

##########
File path: java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
##########
@@ -663,41 +691,46 @@ public void checkEncoding(OrcProto.ColumnEncoding 
encoding) throws IOException {
     }
 
     @Override
-    public void startStripe(StripePlanner planner) throws IOException {
-      super.startStripe(planner);
+    public void startStripe(StripePlanner planner, ReadLevel readLevel) throws 
IOException {
+
+      super.startStripe(planner, readLevel);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
       reader = createIntegerReader(planner.getEncoding(columnId).getKind(),
           planner.getStream(name), true, context);
     }
 
     @Override
-    public void seek(PositionProvider[] index) throws IOException {
-      seek(index[columnId]);
+    public void seek(PositionProvider[] index, ReadLevel readLevel) throws 
IOException {
+      seek(index[columnId], readLevel);
     }
 
     @Override
-    public void seek(PositionProvider index) throws IOException {
-      super.seek(index);
+    public void seek(PositionProvider index, ReadLevel readLevel) throws 
IOException {
+
+      super.seek(index, readLevel);
       reader.seek(index);
     }
 
     @Override
     public void nextVector(ColumnVector previousVector,
                            boolean[] isNull,
                            final int batchSize,
-                           FilterContext filterContext) throws IOException {
+                           FilterContext filterContext,
+                           ReadLevel readLevel) throws IOException {
+

Review comment:
       Totally agree -- there are empty line additions allover the PR

##########
File path: site/develop/design/lazy_filter.md
##########
@@ -0,0 +1,352 @@
+* [Lazy Filter](#LazyFilter)
+  * [Background](#Background)
+  * [Design](#Design)
+    * [SArg to Filter](#SArgtoFilter)
+    * [Read](#Read)
+  * [Configuration](#Configuration)
+  * [Tests](#Tests)
+  * [Appendix](#Appendix)
+    * [Benchmarks](#Benchmarks)
+      * [Row vs Vector](#RowvsVector)
+      * [Filter](#Filter)
+
+# Lazy Filter <a id="LazyFilter"></a>
+
+## Background <a id="Background"></a>
+
+This feature request started as a result of a large search that is performed 
with the following characteristics:
+
+* The search fields are not part of partition, bucket or sort fields.
+* The table is a very large table.
+* The predicates result in very few rows compared to the scan size.
+* The search columns are a significant subset of selection columns in the 
query.
+
+Initial analysis showed that we could have a significant benefit by lazily 
reading the non-search columns only when we
+have a match. We explore the design and some benchmarks in subsequent sections.
+
+## Design <a id="Design"></a>
+
+This builds further on [ORC-577][ORC-577] which currently only restricts 
deserialization for some selected data types
+but does not improve on IO.
+
+On a high level the design includes the following components:
+
+```text
+┌──────────────┐          ┌────────────────────────┐
+│              │          │          Read          │
+│              │          │                        │
+│              │          │     ┌────────────┐     │
+│SArg to Filter│─────────▶│     │Read Filter │     │
+│              │          │     │  Columns   │     │
+│              │          │     └────────────┘     │
+│              │          │            │           │
+└──────────────┘          │            ▼           │
+                          │     ┌────────────┐     │
+                          │     │Apply Filter│     │
+                          │     └────────────┘     │
+                          │            │           │
+                          │            ▼           │
+                          │     ┌────────────┐     │
+                          │     │Read Select │     │
+                          │     │  Columns   │     │
+                          │     └────────────┘     │
+                          │                        │
+                          │                        │
+                          └────────────────────────┘
+```
+
+* **SArg to Filter**: Converts Search Arguments passed down into filters for 
efficient application during scans.
+* **Read**: Performs the lazy read using the filters.
+  * **Read Filter Columns**: Read the filter columns from the file.
+  * **Apply Filter**: Apply the filter on the read filter columns.
+  * **Read Select Columns**: If filter selects at least a row then read the 
remaining columns.
+
+### SArg to Filter <a id="SArgtoFilter"></a>
+
+SArg to Filter converts the passed SArg into a filter. This enables automatic 
compatibility with both Spark and Hive as
+they already push down Search Arguments down to ORC.
+
+The SArg is automatically converted into a [Vector Filter][vfilter]. Which is 
applied during the read process. Two
+filter types were evaluated:
+
+* [Row Filter][rfilter] that evaluates each row across all the predicates once.
+* [Vector Filter][vfilter] that evaluates each filter across the entire vector 
and adjusts the subsequent evaluation.
+
+While a row based filter is easier to code, it is much [slower][rowvvector] to 
process. We also see a significant
+[performance gain][rowvvector] in the absence of normalization.
+
+The builder for search argument should allow skipping normalization during the 
[build][build]. This has already been
+proposed as part of [HIVE-24458][HIVE-24458].
+
+### Read <a id="Read"></a>
+
+The read process has the following changes:
+
+```text
+                         │
+                         │
+                         │
+┌────────────────────────▼────────────────────────┐
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃Plan ++Search++ ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                 Read   │Stripe                  │
+└────────────────────────┼────────────────────────┘
+                         │
+                         ▼
+
+
+                         │
+                         │
+┌────────────────────────▼────────────────────────┐
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃Read ++Search++ ┃                │
+│               ┃    Columns     ┃◀─────────┐     │
+│               ┗━━━━━━━━━━━━━━━━┛          │     │
+│                        │              Size = 0  │
+│                        ▼                  │     │
+│               ┏━━━━━━━━━━━━━━━━┓          │     │
+│               ┃  Apply Filter  ┃──────────┘     │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                    Size > 0                     │
+│                        │                        │
+│                        ▼                        │
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃  Plan Select   ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                        │                        │
+│                        ▼                        │
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃  Read Select   ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                   Next │Batch                   │
+└────────────────────────┼────────────────────────┘
+                         │
+                         ▼
+```
+
+The read process changes:
+
+* **Read Stripe** used to plan the read of all (search + select) columns. This 
is enhanced to plan and fetch only the
+  search columns. The rest of the stripe planning process optimizations remain 
unchanged e.g. partial read planning of
+  the stripe based on RowGroup statistics.
+* **Next Batch** identifies the processing that takes place when 
`RecordReader.nextBatch` is invoked.
+  * **Read Search Columns** takes place instead of reading all the selected 
columns. This is in sync with the planning
+    that has taken place during **Read Stripe** where only the search columns 
have been planned.
+  * **Apply Filter** on the batch that at this point only includes search 
columns. Evaluate the result of the filter:
+    * **Size = 0** indicates all records have been filtered out. Given this we 
proceed to the next batch of search
+      columns.
+    * **Size > 0** indicates that at least one record accepted by the filter. 
This record needs to be substantiated with
+      other columns.
+  * **Plan Select Columns** is invoked to perform read of the select columns. 
The planning happens as follows:
+    * Determine the current position of the read within the stripe and plan 
the read for the select columns from this
+      point forward to the end of the stripe.
+    * The Read planning of select columns respects the row groups filtered out 
as a result of the stripe planning.
+    * Fetch the select columns using the above plan.
+  * **Read Select Columns** into the vectorized row batch
+  * Return this batch.
+
+The current implementation performs a single read for the select columns in a 
stripe.
+
+```text
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │RG0 │ │RG1 │ │RG2■│ │RG3 │ │RG4 │ │RG5■│ │RG6 │ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│                      Stripe                      │
+└──────────────────────────────────────────────────┘
+```
+
+The above diagram depicts a stripe with 7 Row Groups out of which **RG2** and 
**RG5** are selected by the filter. The
+current implementation does the following:
+
+* Start the read planning process from the first match RG2
+* Read to the end of the stripe that includes RG6
+* Based on the above fetch skips RG0 and RG1 subject to compression block 
boundaries
+
+The above logic could be enhanced to perform say **2 or n** reads before 
reading to the end of stripe. The current
+implementation allows 0 reads before reading to the end of the stripe. The 
value of **n** could be configurable but
+should avoid too many short reads.
+
+The read behavior changes as follows with multiple reads being allowed within 
a stripe for select columns:
+
+```text
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │    │ │    │ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│              Current implementation              │
+└──────────────────────────────────────────────────┘
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │    │ │    │ │■■■■│ │    │ │    │ │■■■■│ │■■■■│ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│               Allow 1 partial read               │
+└──────────────────────────────────────────────────┘
+```
+
+The figure shows that we could read significantly fewer bytes by performing an 
additional read before reading to the end
+of stripe. This shall be included as a subsequent enhancement to this patch.
+
+## Configuration <a id="Configuration"></a>
+
+The following configuration options are exposed that control the filter 
behavior:
+
+|Property                   |Type   |Default|
+|:---                       |:---   |:---   |
+|orc.sarg.to.filter         |boolean|false  |
+|orc.sarg.to.filter.selected|boolean|false  |
+
+* `orc.sarg.to.filter` can be used to turn off the SArg to filter conversion. 
This might be particularly relevant in 
+  cases where the filter is expensive and does not eliminate a lot of records.
+* `orc.sarg.to.filter.selected` is an important setting that if incorrectly 
enabled results in wrong output. The 
+  `VectorizedRowBatch` has a selected vector that defines which rows are 
selected. This property should be set to `true`
+  only if the consumer respects the selected vector in determining the valid 
rows.
+
+## Tests <a id="Tests"></a>
+
+We evaluated this patch against a search job with the following stats:
+
+* Table
+  * Size: ~**420 TB**
+  * Data fields: ~**120**
+  * Partition fields: **3**
+* Scan
+  * Search fields: 3 data fields with large (~ 1000 value) IN clauses 
compounded by **OR**.
+  * Select fields: 16 data fields (includes the 3 search fields), 1 partition 
field
+  * Search:
+    * Size: ~**180 TB**
+    * Records: **3.99 T**
+  * Selected:
+    * Size: ~**100 MB**
+    * Records: **1 M**
+
+We have observed the following reductions:
+
+|Test    |IO Reduction %|CPU Reduction %|
+|:---    |          ---:|           ---:|
+|Same    |            45|             47|
+|SELECT *|            70|             87|
+
+* The savings are more significant as you increase the number of select 
columns with respect to the search columns
+* When the filter selects most data, no significant penalty observed as a 
result of 2 IO compared with a single IO
+  * We do have a penalty as a result of the filter application on the selected 
records.
+
+## Appendix <a id="Appendix"></a>
+
+### Benchmarks <a id="Benchmarks"></a>
+
+#### Row vs Vector <a id="RowvsVector"></a>
+
+We start with a decision of using a Row filter vs a Vector filter. The Row 
filter has the advantage of simpler code vs
+the Vector filter.
+
+```bash
+java -jar target/orc-benchmarks-core-1.7.0-SNAPSHOT-uber.jar filter simple
+```
+
+|Benchmark               |(fInSize)|(fType)|Mode| Cnt| Score|Error  |Units|
+|:---                    |     ---:|:---   |:---|---:|  ---:|:---   |:--- |
+|SimpleFilterBench.filter|        4|row    |avgt|  20|52.260|± 0.109|us/op|
+|SimpleFilterBench.filter|        4|vector |avgt|  20|19.699|± 0.044|us/op|
+|SimpleFilterBench.filter|        8|row    |avgt|  20|59.648|± 0.179|us/op|
+|SimpleFilterBench.filter|        8|vector |avgt|  20|28.655|± 0.036|us/op|
+|SimpleFilterBench.filter|       16|row    |avgt|  20|56.480|± 0.190|us/op|
+|SimpleFilterBench.filter|       16|vector |avgt|  20|46.757|± 0.124|us/op|
+|SimpleFilterBench.filter|       32|row    |avgt|  20|57.780|± 0.111|us/op|
+|SimpleFilterBench.filter|       32|vector |avgt|  20|52.060|± 0.333|us/op|
+|SimpleFilterBench.filter|      256|row    |avgt|  20|50.898|± 0.275|us/op|
+|SimpleFilterBench.filter|      256|vector |avgt|  20|85.684|± 0.351|us/op|
+
+Explanation:
+
+* **fInSize** calls out the number of values in the IN clause.
+* **fType** calls out the whether the filter is a row based filter, or a 
vector based filter.
+
+Observations:
+
+* The vector based filter is significantly faster than the row based filter.
+  * At best, vector was faster by **59.62%**
+  * At worst, vector was faster by **32.14%**
+* The performance of the filters is deteriorates with the increase of the IN 
values, however even in this case the
+  vector filter is much better than the row filter.
+
+In the next test we use a complex filter with both AND, and OR to understand 
the impact of Conjunctive Normal Form on

Review comment:
       * evaluation happens on every VectorBatch (1024 rows by default) 

##########
File path: java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
##########
@@ -83,17 +85,21 @@
   private final boolean[] fileIncluded;
   private final long rowIndexStride;
   private long rowInStripe = 0;
+  private long followRowInStripe = 0;
   private int currentStripe = -1;
   private long rowBaseInStripe = 0;
   private long rowCountInStripe = 0;
   private final BatchReader reader;
   private final OrcIndex indexes;
+  private final boolean[] rowIndexCols;
   private final SargApplier sargApp;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
   private final DataReader dataReader;
   private final int maxDiskRangeChunkLimit;
   private final StripePlanner planner;
+  private final ReadLevel readLevel;
+  private boolean needFollowStripe;

Review comment:
       have been read or need to be read?

##########
File path: site/develop/design/lazy_filter.md
##########
@@ -0,0 +1,352 @@
+* [Lazy Filter](#LazyFilter)
+  * [Background](#Background)
+  * [Design](#Design)
+    * [SArg to Filter](#SArgtoFilter)
+    * [Read](#Read)
+  * [Configuration](#Configuration)
+  * [Tests](#Tests)
+  * [Appendix](#Appendix)
+    * [Benchmarks](#Benchmarks)
+      * [Row vs Vector](#RowvsVector)
+      * [Filter](#Filter)
+
+# Lazy Filter <a id="LazyFilter"></a>
+
+## Background <a id="Background"></a>
+
+This feature request started as a result of a large search that is performed 
with the following characteristics:
+
+* The search fields are not part of partition, bucket or sort fields.
+* The table is a very large table.
+* The predicates result in very few rows compared to the scan size.
+* The search columns are a significant subset of selection columns in the 
query.
+
+Initial analysis showed that we could have a significant benefit by lazily 
reading the non-search columns only when we
+have a match. We explore the design and some benchmarks in subsequent sections.
+
+## Design <a id="Design"></a>
+
+This builds further on [ORC-577][ORC-577] which currently only restricts 
deserialization for some selected data types
+but does not improve on IO.
+
+On a high level the design includes the following components:
+
+```text
+┌──────────────┐          ┌────────────────────────┐
+│              │          │          Read          │
+│              │          │                        │
+│              │          │     ┌────────────┐     │
+│SArg to Filter│─────────▶│     │Read Filter │     │
+│              │          │     │  Columns   │     │
+│              │          │     └────────────┘     │
+│              │          │            │           │
+└──────────────┘          │            ▼           │
+                          │     ┌────────────┐     │
+                          │     │Apply Filter│     │
+                          │     └────────────┘     │
+                          │            │           │
+                          │            ▼           │
+                          │     ┌────────────┐     │
+                          │     │Read Select │     │
+                          │     │  Columns   │     │
+                          │     └────────────┘     │
+                          │                        │
+                          │                        │
+                          └────────────────────────┘
+```
+
+* **SArg to Filter**: Converts Search Arguments passed down into filters for 
efficient application during scans.
+* **Read**: Performs the lazy read using the filters.
+  * **Read Filter Columns**: Read the filter columns from the file.
+  * **Apply Filter**: Apply the filter on the read filter columns.
+  * **Read Select Columns**: If filter selects at least a row then read the 
remaining columns.
+
+### SArg to Filter <a id="SArgtoFilter"></a>
+
+SArg to Filter converts the passed SArg into a filter. This enables automatic 
compatibility with both Spark and Hive as
+they already push down Search Arguments down to ORC.
+
+The SArg is automatically converted into a [Vector Filter][vfilter]. Which is 
applied during the read process. Two
+filter types were evaluated:
+
+* [Row Filter][rfilter] that evaluates each row across all the predicates once.
+* [Vector Filter][vfilter] that evaluates each filter across the entire vector 
and adjusts the subsequent evaluation.
+
+While a row based filter is easier to code, it is much [slower][rowvvector] to 
process. We also see a significant
+[performance gain][rowvvector] in the absence of normalization.
+
+The builder for search argument should allow skipping normalization during the 
[build][build]. This has already been
+proposed as part of [HIVE-24458][HIVE-24458].
+
+### Read <a id="Read"></a>
+
+The read process has the following changes:
+
+```text
+                         │
+                         │
+                         │
+┌────────────────────────▼────────────────────────┐
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃Plan ++Search++ ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                 Read   │Stripe                  │
+└────────────────────────┼────────────────────────┘
+                         │
+                         ▼
+
+
+                         │
+                         │
+┌────────────────────────▼────────────────────────┐
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃Read ++Search++ ┃                │
+│               ┃    Columns     ┃◀─────────┐     │
+│               ┗━━━━━━━━━━━━━━━━┛          │     │
+│                        │              Size = 0  │
+│                        ▼                  │     │
+│               ┏━━━━━━━━━━━━━━━━┓          │     │
+│               ┃  Apply Filter  ┃──────────┘     │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                    Size > 0                     │
+│                        │                        │
+│                        ▼                        │
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃  Plan Select   ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                        │                        │
+│                        ▼                        │
+│               ┏━━━━━━━━━━━━━━━━┓                │
+│               ┃  Read Select   ┃                │
+│               ┃    Columns     ┃                │
+│               ┗━━━━━━━━━━━━━━━━┛                │
+│                   Next │Batch                   │
+└────────────────────────┼────────────────────────┘
+                         │
+                         ▼
+```
+
+The read process changes:
+
+* **Read Stripe** used to plan the read of all (search + select) columns. This 
is enhanced to plan and fetch only the
+  search columns. The rest of the stripe planning process optimizations remain 
unchanged e.g. partial read planning of
+  the stripe based on RowGroup statistics.
+* **Next Batch** identifies the processing that takes place when 
`RecordReader.nextBatch` is invoked.
+  * **Read Search Columns** takes place instead of reading all the selected 
columns. This is in sync with the planning
+    that has taken place during **Read Stripe** where only the search columns 
have been planned.
+  * **Apply Filter** on the batch that at this point only includes search 
columns. Evaluate the result of the filter:
+    * **Size = 0** indicates all records have been filtered out. Given this we 
proceed to the next batch of search
+      columns.
+    * **Size > 0** indicates that at least one record accepted by the filter. 
This record needs to be substantiated with
+      other columns.
+  * **Plan Select Columns** is invoked to perform read of the select columns. 
The planning happens as follows:
+    * Determine the current position of the read within the stripe and plan 
the read for the select columns from this
+      point forward to the end of the stripe.
+    * The Read planning of select columns respects the row groups filtered out 
as a result of the stripe planning.
+    * Fetch the select columns using the above plan.
+  * **Read Select Columns** into the vectorized row batch
+  * Return this batch.
+
+The current implementation performs a single read for the select columns in a 
stripe.
+
+```text
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │RG0 │ │RG1 │ │RG2■│ │RG3 │ │RG4 │ │RG5■│ │RG6 │ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│                      Stripe                      │
+└──────────────────────────────────────────────────┘
+```
+
+The above diagram depicts a stripe with 7 Row Groups out of which **RG2** and 
**RG5** are selected by the filter. The
+current implementation does the following:
+
+* Start the read planning process from the first match RG2
+* Read to the end of the stripe that includes RG6
+* Based on the above fetch skips RG0 and RG1 subject to compression block 
boundaries
+
+The above logic could be enhanced to perform say **2 or n** reads before 
reading to the end of stripe. The current
+implementation allows 0 reads before reading to the end of the stripe. The 
value of **n** could be configurable but
+should avoid too many short reads.
+
+The read behavior changes as follows with multiple reads being allowed within 
a stripe for select columns:
+
+```text
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │    │ │    │ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│              Current implementation              │
+└──────────────────────────────────────────────────┘
+┌──────────────────────────────────────────────────┐
+│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
+│ │    │ │    │ │■■■■│ │    │ │    │ │■■■■│ │■■■■│ │
+│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
+│               Allow 1 partial read               │
+└──────────────────────────────────────────────────┘
+```
+
+The figure shows that we could read significantly fewer bytes by performing an 
additional read before reading to the end

Review comment:
       Is partial read conf part of ORC-743? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to