This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-1.7 in repository https://gitbox.apache.org/repos/asf/orc.git
commit 431c5212f0c5bdf9709c41f4c32e7fdc6f193086 Author: Zoltan Ratkai <[email protected]> AuthorDate: Thu May 18 00:02:58 2023 -0700 ORC-1413: Fix for ORC row level filter issue with ACID table This PR fixes ORC row level filter with ACID table issue. Without this Hive can not work with ORC 1.8.3. and ACID table and row level filter enabled. Unit test added. Closes #1495 from zratkai/ORC-1413-1.8. Authored-by: Zoltan Ratkai <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> --- .../src/java/org/apache/orc/impl/ParserUtils.java | 17 ++- .../test/org/apache/orc/TestOrcFilterContext.java | 132 ++++++++++++++++++++- 2 files changed, 143 insertions(+), 6 deletions(-) diff --git a/java/core/src/java/org/apache/orc/impl/ParserUtils.java b/java/core/src/java/org/apache/orc/impl/ParserUtils.java index 84bf049df..1afd6c0fc 100644 --- a/java/core/src/java/org/apache/orc/impl/ParserUtils.java +++ b/java/core/src/java/org/apache/orc/impl/ParserUtils.java @@ -411,18 +411,22 @@ public class ParserUtils { private final ColumnVector[] result; private int resultIdx = 0; - ColumnFinder(TypeDescription schema, VectorizedRowBatch batch, int levels) { + ColumnFinder(TypeDescription schema, ColumnVector[] columnVectors, int levels) { if (schema.getCategory() == TypeDescription.Category.STRUCT) { - top = batch.cols; + top = columnVectors; result = new ColumnVector[levels]; } else { result = new ColumnVector[levels + 1]; - current = batch.cols[0]; + current = columnVectors[0]; top = null; addResult(current); } } + ColumnFinder(TypeDescription schema, VectorizedRowBatch vectorizedRowBatch, int levels) { + this(schema, vectorizedRowBatch.cols, levels); + } + private void addResult(ColumnVector vector) { result[resultIdx] = vector; resultIdx += 1; @@ -459,8 +463,11 @@ public class ParserUtils { boolean isCaseSensitive, VectorizedRowBatch batch) { List<String> names = ParserUtils.splitName(source); - ColumnFinder result = new ColumnFinder(schema, batch, names.size()); - findColumn(removeAcid(schema), names, isCaseSensitive, result); + TypeDescription schemaToUse = removeAcid(schema); + ColumnVector[] columnVectors = SchemaEvolution.checkAcidSchema(schema) + ? ((StructColumnVector) batch.cols[batch.cols.length - 1]).fields : batch.cols; + ColumnFinder result = new ColumnFinder(schemaToUse, columnVectors, names.size()); + findColumn(schemaToUse, names, isCaseSensitive, result); return result.result; } diff --git a/java/core/src/test/org/apache/orc/TestOrcFilterContext.java b/java/core/src/test/org/apache/orc/TestOrcFilterContext.java index c38e9081d..8abc5a0d1 100644 --- a/java/core/src/test/org/apache/orc/TestOrcFilterContext.java +++ b/java/core/src/test/org/apache/orc/TestOrcFilterContext.java @@ -25,10 +25,24 @@ import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.orc.impl.OrcFilterContextImpl; +import org.apache.orc.impl.SchemaEvolution; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Random; +import java.util.Arrays; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -61,9 +75,22 @@ public class TestOrcFilterContext { TypeDescription.createList(TypeDescription.createChar())) ) ); + private static Configuration configuration; + private static FileSystem fileSystem; + private static final Path workDir = new Path(System.getProperty("test.tmp.dir", + "target" + File.separator + "test" + + File.separator + "tmp")); + private static final Path filePath = new Path(workDir, "orc_filter_file.orc"); + + private static final int RowCount = 400; + private final OrcFilterContext filterContext = new OrcFilterContextImpl(schema, false) .setBatch(schema.createRowBatch()); - + TypeDescription typeDescriptionACID = + TypeDescription.fromString("struct<int1:int,string1:string>"); + TypeDescription acidSchema = SchemaEvolution.createEventSchema(typeDescriptionACID); + private final OrcFilterContext filterContextACID = new OrcFilterContextImpl(acidSchema, true) + .setBatch(acidSchema.createRowBatch()); @BeforeEach public void setup() { filterContext.reset(); @@ -225,4 +252,107 @@ public class TestOrcFilterContext { assertTrue(OrcFilterContext.isNull(vectorBranch, 1)); assertTrue(OrcFilterContext.isNull(vectorBranch, 2)); } + + @Test + public void testACIDTable() { + ColumnVector[] columnVector = filterContextACID.findColumnVector("string1"); + assertEquals(1, columnVector.length); + assertTrue(columnVector[0] instanceof BytesColumnVector, "Expected a BytesColumnVector, but found "+ columnVector[0].getClass()); + columnVector = filterContextACID.findColumnVector("int1"); + assertEquals(1, columnVector.length); + assertTrue(columnVector[0] instanceof LongColumnVector, "Expected a LongColumnVector, but found "+ columnVector[0].getClass()); + } + + @Test + public void testRowFilterWithACIDTable() throws IOException { + createAcidORCFile(); + readSingleRowWithFilter(new Random().nextInt(RowCount)); + fileSystem.delete(filePath, false); + + } + + private void createAcidORCFile() throws IOException { + configuration = new Configuration(); + fileSystem = FileSystem.get(configuration); + + try (Writer writer = OrcFile.createWriter(filePath, + OrcFile.writerOptions(configuration) + .fileSystem(fileSystem) + .overwrite(true) + .rowIndexStride(8192) + .setSchema(acidSchema))) { + + Random random = new Random(1024); + VectorizedRowBatch vectorizedRowBatch = acidSchema.createRowBatch(); + for (int rowId = 0; rowId < RowCount; rowId++) { + long v = random.nextLong(); + populateColumnValues(acidSchema, vectorizedRowBatch.cols,vectorizedRowBatch.size, v); + // Populate the rowId + ((LongColumnVector) vectorizedRowBatch.cols[3]).vector[vectorizedRowBatch.size] = rowId; + StructColumnVector row = (StructColumnVector) vectorizedRowBatch.cols[5]; + ((LongColumnVector) row.fields[0]).vector[vectorizedRowBatch.size] = rowId; + vectorizedRowBatch.size += 1; + if (vectorizedRowBatch.size == vectorizedRowBatch.getMaxSize()) { + writer.addRowBatch(vectorizedRowBatch); + vectorizedRowBatch.reset(); + } + } + if (vectorizedRowBatch.size > 0) { + writer.addRowBatch(vectorizedRowBatch); + vectorizedRowBatch.reset(); + } + } + } + + private void populateColumnValues(TypeDescription typeDescription, ColumnVector[] columnVectors, int index, long value) { + for (int columnId = 0; columnId < typeDescription.getChildren().size() ; columnId++) { + switch (typeDescription.getChildren().get(columnId).getCategory()) { + case INT: + ((LongColumnVector)columnVectors[columnId]).vector[index] = value; + break; + case LONG: + ((LongColumnVector)columnVectors[columnId]).vector[index] = value; + break; + case STRING: + ((BytesColumnVector) columnVectors[columnId]).setVal(index, + ("String-"+ index).getBytes(StandardCharsets.UTF_8)); + break; + case STRUCT: + populateColumnValues(typeDescription.getChildren().get(columnId), ((StructColumnVector)columnVectors[columnId]).fields, index, value); + break; + default: + throw new IllegalArgumentException(); + } + } + } + + private void readSingleRowWithFilter(int id) throws IOException { + Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration).filesystem(fileSystem)); + SearchArgument searchArgument = SearchArgumentFactory.newBuilder() + .in("int1", PredicateLeaf.Type.LONG, Long.valueOf(id)) + .build(); + Reader.Options readerOptions = reader.options() + .searchArgument(searchArgument, new String[] {"int1"}) + .useSelected(true) + .allowSARGToFilter(true); + VectorizedRowBatch vectorizedRowBatch = acidSchema.createRowBatch(); + long rowCount = 0; + try (RecordReader recordReader = reader.rows(readerOptions)) { + assertTrue(recordReader.nextBatch(vectorizedRowBatch)); + rowCount += vectorizedRowBatch.size; + assertEquals(6, vectorizedRowBatch.cols.length); + assertTrue(vectorizedRowBatch.cols[5] instanceof StructColumnVector); + assertTrue(((StructColumnVector) vectorizedRowBatch.cols[5]).fields[0] instanceof LongColumnVector); + assertTrue(((StructColumnVector) vectorizedRowBatch.cols[5]).fields[1] instanceof BytesColumnVector); + assertEquals(id, ((LongColumnVector) ((StructColumnVector) vectorizedRowBatch.cols[5]).fields[0]).vector[vectorizedRowBatch.selected[0]]); + checkStringColumn(id, vectorizedRowBatch); + assertFalse(recordReader.nextBatch(vectorizedRowBatch)); + } + assertEquals(1, rowCount); + } + + private static void checkStringColumn(int id, VectorizedRowBatch vectorizedRowBatch) { + BytesColumnVector bytesColumnVector = (BytesColumnVector) ((StructColumnVector) vectorizedRowBatch.cols[5]).fields[1]; + assertEquals("String-"+ id, bytesColumnVector.toString(id)); + } }
