This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/orc.git

commit 431c5212f0c5bdf9709c41f4c32e7fdc6f193086
Author: Zoltan Ratkai <[email protected]>
AuthorDate: Thu May 18 00:02:58 2023 -0700

    ORC-1413: Fix for ORC row level filter issue with ACID table
    
    This PR fixes ORC row level filter with ACID table issue.
    
    Without this Hive can not work with ORC 1.8.3. and ACID table and row level 
filter enabled.
    
    Unit test added.
    
    Closes #1495 from zratkai/ORC-1413-1.8.
    
    Authored-by: Zoltan Ratkai <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../src/java/org/apache/orc/impl/ParserUtils.java  |  17 ++-
 .../test/org/apache/orc/TestOrcFilterContext.java  | 132 ++++++++++++++++++++-
 2 files changed, 143 insertions(+), 6 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/ParserUtils.java 
b/java/core/src/java/org/apache/orc/impl/ParserUtils.java
index 84bf049df..1afd6c0fc 100644
--- a/java/core/src/java/org/apache/orc/impl/ParserUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/ParserUtils.java
@@ -411,18 +411,22 @@ public class ParserUtils {
     private final ColumnVector[] result;
     private int resultIdx = 0;
 
-    ColumnFinder(TypeDescription schema, VectorizedRowBatch batch, int levels) 
{
+    ColumnFinder(TypeDescription schema, ColumnVector[] columnVectors, int 
levels) {
       if (schema.getCategory() == TypeDescription.Category.STRUCT) {
-        top = batch.cols;
+        top = columnVectors;
         result = new ColumnVector[levels];
       } else {
         result = new ColumnVector[levels + 1];
-        current = batch.cols[0];
+        current = columnVectors[0];
         top = null;
         addResult(current);
       }
     }
 
+    ColumnFinder(TypeDescription schema, VectorizedRowBatch 
vectorizedRowBatch, int levels) {
+      this(schema, vectorizedRowBatch.cols, levels);
+    }
+
     private void addResult(ColumnVector vector) {
       result[resultIdx] = vector;
       resultIdx += 1;
@@ -459,8 +463,11 @@ public class ParserUtils {
                                                  boolean isCaseSensitive,
                                                  VectorizedRowBatch batch) {
     List<String> names = ParserUtils.splitName(source);
-    ColumnFinder result = new ColumnFinder(schema, batch, names.size());
-    findColumn(removeAcid(schema), names, isCaseSensitive, result);
+    TypeDescription schemaToUse = removeAcid(schema);
+    ColumnVector[] columnVectors = SchemaEvolution.checkAcidSchema(schema)
+                  ? ((StructColumnVector) batch.cols[batch.cols.length - 
1]).fields : batch.cols;
+    ColumnFinder result = new ColumnFinder(schemaToUse, columnVectors, 
names.size());
+    findColumn(schemaToUse, names, isCaseSensitive, result);
     return result.result;
   }
 
diff --git a/java/core/src/test/org/apache/orc/TestOrcFilterContext.java 
b/java/core/src/test/org/apache/orc/TestOrcFilterContext.java
index c38e9081d..8abc5a0d1 100644
--- a/java/core/src/test/org/apache/orc/TestOrcFilterContext.java
+++ b/java/core/src/test/org/apache/orc/TestOrcFilterContext.java
@@ -25,10 +25,24 @@ import 
org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.orc.impl.OrcFilterContextImpl;
+import org.apache.orc.impl.SchemaEvolution;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+import java.util.Arrays;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -61,9 +75,22 @@ public class TestOrcFilterContext {
                                            
TypeDescription.createList(TypeDescription.createChar()))
                 )
     );
+  private static Configuration configuration;
+  private static FileSystem fileSystem;
+  private static final Path workDir = new 
Path(System.getProperty("test.tmp.dir",
+          "target" + File.separator + "test"
+                  + File.separator + "tmp"));
+  private static final Path filePath = new Path(workDir, 
"orc_filter_file.orc");
+
+  private static final int RowCount = 400;
+
   private final OrcFilterContext filterContext = new 
OrcFilterContextImpl(schema, false)
     .setBatch(schema.createRowBatch());
-
+  TypeDescription typeDescriptionACID =
+          TypeDescription.fromString("struct<int1:int,string1:string>");
+  TypeDescription acidSchema = 
SchemaEvolution.createEventSchema(typeDescriptionACID);
+  private final OrcFilterContext filterContextACID = new 
OrcFilterContextImpl(acidSchema, true)
+          .setBatch(acidSchema.createRowBatch());
   @BeforeEach
   public void setup() {
     filterContext.reset();
@@ -225,4 +252,107 @@ public class TestOrcFilterContext {
     assertTrue(OrcFilterContext.isNull(vectorBranch, 1));
     assertTrue(OrcFilterContext.isNull(vectorBranch, 2));
   }
+  
+  @Test
+  public void testACIDTable() {
+    ColumnVector[] columnVector = 
filterContextACID.findColumnVector("string1");
+    assertEquals(1, columnVector.length);
+    assertTrue(columnVector[0] instanceof BytesColumnVector, "Expected a  
BytesColumnVector, but found "+ columnVector[0].getClass());
+    columnVector = filterContextACID.findColumnVector("int1");
+    assertEquals(1, columnVector.length);
+    assertTrue(columnVector[0] instanceof LongColumnVector, "Expected a  
LongColumnVector, but found "+ columnVector[0].getClass());
+  }
+
+  @Test
+  public void testRowFilterWithACIDTable() throws IOException {
+    createAcidORCFile();
+    readSingleRowWithFilter(new Random().nextInt(RowCount));
+    fileSystem.delete(filePath, false);
+    
+  }
+
+  private void createAcidORCFile() throws IOException {
+    configuration = new Configuration();
+    fileSystem = FileSystem.get(configuration);
+
+    try (Writer writer = OrcFile.createWriter(filePath,
+            OrcFile.writerOptions(configuration)
+                    .fileSystem(fileSystem)
+                    .overwrite(true)
+                    .rowIndexStride(8192)
+                    .setSchema(acidSchema))) {
+
+      Random random = new Random(1024);
+      VectorizedRowBatch vectorizedRowBatch = acidSchema.createRowBatch();
+      for (int rowId = 0; rowId < RowCount; rowId++) {
+        long v = random.nextLong();
+        populateColumnValues(acidSchema, 
vectorizedRowBatch.cols,vectorizedRowBatch.size, v);
+        // Populate the rowId
+        ((LongColumnVector) 
vectorizedRowBatch.cols[3]).vector[vectorizedRowBatch.size] = rowId;
+        StructColumnVector row = (StructColumnVector) 
vectorizedRowBatch.cols[5];
+        ((LongColumnVector) row.fields[0]).vector[vectorizedRowBatch.size] = 
rowId;
+        vectorizedRowBatch.size += 1;
+        if (vectorizedRowBatch.size == vectorizedRowBatch.getMaxSize()) {
+          writer.addRowBatch(vectorizedRowBatch);
+          vectorizedRowBatch.reset();
+        }
+      }
+      if (vectorizedRowBatch.size > 0) {
+        writer.addRowBatch(vectorizedRowBatch);
+        vectorizedRowBatch.reset();
+      }
+    }
+  }
+  
+  private void populateColumnValues(TypeDescription typeDescription, 
ColumnVector[] columnVectors, int index, long value) {
+    for (int columnId = 0; columnId < typeDescription.getChildren().size() ; 
columnId++) {
+      switch (typeDescription.getChildren().get(columnId).getCategory()) {
+        case INT:
+          ((LongColumnVector)columnVectors[columnId]).vector[index] = value;
+          break;
+        case LONG:
+          ((LongColumnVector)columnVectors[columnId]).vector[index] = value;
+          break;
+        case STRING:
+          ((BytesColumnVector) columnVectors[columnId]).setVal(index,
+                  ("String-"+ index).getBytes(StandardCharsets.UTF_8));
+          break;
+        case STRUCT:
+          populateColumnValues(typeDescription.getChildren().get(columnId), 
((StructColumnVector)columnVectors[columnId]).fields, index, value);
+          break;           
+        default:
+          throw new IllegalArgumentException();
+      }
+    }
+  }
+  
+  private void readSingleRowWithFilter(int id) throws IOException {
+    Reader reader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(configuration).filesystem(fileSystem));
+    SearchArgument searchArgument = SearchArgumentFactory.newBuilder()
+            .in("int1", PredicateLeaf.Type.LONG, Long.valueOf(id))
+            .build();
+    Reader.Options readerOptions = reader.options()
+            .searchArgument(searchArgument, new String[] {"int1"})
+            .useSelected(true)
+            .allowSARGToFilter(true);
+    VectorizedRowBatch vectorizedRowBatch = acidSchema.createRowBatch();
+    long rowCount = 0;
+    try (RecordReader recordReader = reader.rows(readerOptions)) {
+      assertTrue(recordReader.nextBatch(vectorizedRowBatch));
+      rowCount += vectorizedRowBatch.size;
+      assertEquals(6, vectorizedRowBatch.cols.length);
+      assertTrue(vectorizedRowBatch.cols[5] instanceof StructColumnVector);
+      assertTrue(((StructColumnVector) vectorizedRowBatch.cols[5]).fields[0] 
instanceof LongColumnVector);
+      assertTrue(((StructColumnVector) vectorizedRowBatch.cols[5]).fields[1] 
instanceof BytesColumnVector);
+      assertEquals(id, ((LongColumnVector) ((StructColumnVector) 
vectorizedRowBatch.cols[5]).fields[0]).vector[vectorizedRowBatch.selected[0]]);
+      checkStringColumn(id, vectorizedRowBatch);
+      assertFalse(recordReader.nextBatch(vectorizedRowBatch));
+    }
+    assertEquals(1, rowCount);
+  }
+
+  private static void checkStringColumn(int id, VectorizedRowBatch 
vectorizedRowBatch) {
+    BytesColumnVector bytesColumnVector = (BytesColumnVector) 
((StructColumnVector) vectorizedRowBatch.cols[5]).fields[1];
+    assertEquals("String-"+ id, bytesColumnVector.toString(id));
+  }
 }

Reply via email to