This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new 8daa92eb3 ORC-1139: Benchmark for ORC-1136 that combines multiple 
individual close reads into a single read
8daa92eb3 is described below

commit 8daa92eb391e783d73f4e70291cafeb24e900601
Author: Pavan Lanka <[email protected]>
AuthorDate: Mon May 9 09:42:34 2022 -0700

    ORC-1139: Benchmark for ORC-1136 that combines multiple individual close 
reads into a single read
    
    Adds new benchmark to estimate the impact of combining multiple close reads 
into a single read.
    
    Bench addition to ensure that we have not added any penalties as a result 
of this change
    
    Running and verifying the bench results
    
    Closes #1112
    
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 1ff438813333b75585c49828768e41199de5a088)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/orc/bench/core/filter/FilterBench.java  |  35 ++---
 .../apache/orc/bench/core/impl/ChunkReadBench.java | 121 +++++++++++++++++
 .../apache/orc/bench/core/impl/ChunkReadUtil.java  | 148 +++++++++++++++++++++
 .../orc/bench/core/impl/ChunkReadUtilTest.java     |  90 +++++++++++++
 4 files changed, 379 insertions(+), 15 deletions(-)

diff --git 
a/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java 
b/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java
index c984e6d23..e3d88b119 100644
--- a/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java
+++ b/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java
@@ -81,7 +81,7 @@ public class FilterBench implements OrcBenchmark {
     new Runner(parseOptions(args)).run();
   }
 
-  private static CommandLine parseCommandLine(String[] args) {
+  public static CommandLine parseCommandLine(String[] args, boolean needsArgs) 
{
     org.apache.commons.cli.Options options = new 
org.apache.commons.cli.Options()
       .addOption("h", HELP, false, "Provide help")
       .addOption("i", ITERATIONS, true, "Number of iterations")
@@ -98,7 +98,7 @@ public class FilterBench implements OrcBenchmark {
       System.err.println("Argument exception - " + pe.getMessage());
       result = null;
     }
-    if (result == null || result.hasOption(HELP) || result.getArgs().length == 
0) {
+    if (result == null || result.hasOption(HELP) || (needsArgs && 
result.getArgs().length == 0)) {
       new HelpFormatter().printHelp("java -jar <jar> <command> <options> 
<sub_cmd>\n"
                                     + "sub_cmd:\nsimple\ncomplex\n",
                                     options);
@@ -108,20 +108,8 @@ public class FilterBench implements OrcBenchmark {
     return result;
   }
 
-  public static Options parseOptions(String[] args) {
-    CommandLine options = parseCommandLine(args);
-    String cmd = options.getArgs()[0];
+  public static OptionsBuilder optionsBuilder(CommandLine options) {
     OptionsBuilder builder = new OptionsBuilder();
-    switch (cmd) {
-      case "simple":
-        builder.include(SimpleFilter.class.getSimpleName());
-        break;
-      case "complex":
-        builder.include(ComplexFilter.class.getSimpleName());
-        break;
-      default:
-        throw new UnsupportedOperationException(String.format("Command %s is 
not supported", cmd));
-    }
     if (options.hasOption(GC)) {
       builder.addProfiler("hs_gc");
     }
@@ -147,6 +135,23 @@ public class FilterBench implements OrcBenchmark {
     String maxMemory = options.getOptionValue(MAX_MEMORY, "2g");
     builder.jvmArgs("-server",
                     "-Xms" + minMemory, "-Xmx" + maxMemory);
+    return builder;
+  }
+
+  public static Options parseOptions(String[] args) {
+    CommandLine options = parseCommandLine(args, true);
+    OptionsBuilder builder = optionsBuilder(options);
+    String cmd = options.getArgs()[0];
+    switch (cmd) {
+      case "simple":
+        builder.include(SimpleFilter.class.getSimpleName());
+        break;
+      case "complex":
+        builder.include(ComplexFilter.class.getSimpleName());
+        break;
+      default:
+        throw new UnsupportedOperationException(String.format("Command %s is 
not supported", cmd));
+    }
     return builder.build();
   }
 
diff --git 
a/java/bench/core/src/java/org/apache/orc/bench/core/impl/ChunkReadBench.java 
b/java/bench/core/src/java/org/apache/orc/bench/core/impl/ChunkReadBench.java
new file mode 100644
index 000000000..a33618449
--- /dev/null
+++ 
b/java/bench/core/src/java/org/apache/orc/bench/core/impl/ChunkReadBench.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.core.impl;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.bench.core.OrcBenchmark;
+import org.apache.orc.bench.core.filter.FilterBench;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+@OutputTimeUnit(value = TimeUnit.SECONDS)
+@Warmup(iterations = 20, time = 1)
+@BenchmarkMode(value = Mode.AverageTime)
+@Fork(value = 1)
+@State(value = Scope.Benchmark)
+@Measurement(iterations = 20, time = 1)
+@AutoService(OrcBenchmark.class)
+public class ChunkReadBench implements OrcBenchmark {
+  @Override
+  public String getName() {
+    return "chunk_read";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Perform chunk read bench";
+  }
+
+  @Override
+  public void run(String[] args) throws Exception {
+    CommandLine options = FilterBench.parseCommandLine(args, false);
+    OptionsBuilder builder = FilterBench.optionsBuilder(options);
+    builder.include(getClass().getSimpleName());
+    new Runner(builder.build()).run();
+  }
+
+  private final Path workDir = new Path(System.getProperty("test.tmp.dir",
+                                                           "target" + 
File.separator + "test"
+                                                           + File.separator + 
"tmp"));
+  private final Path filePath = new Path(workDir, "perf_chunk_read_file.orc");
+  private final Configuration conf = new Configuration();
+  @Param( {"128"})
+  private int colCount;
+
+  @Param( {"65536"})
+  private int rowCount;
+
+  @Param( {"true", "false"})
+  private boolean alternate;
+
+  @Param( {"0", "4194304"})
+  private int minSeekSize;
+
+  @Param( {"0.0", "10.0"})
+  private double extraByteTolerance;
+
+  private long readRows = 0;
+
+  @Setup
+  public void setup() throws IOException {
+    if (minSeekSize == 0 && extraByteTolerance > 0) {
+      throw new IllegalArgumentException("Ignore extraByteTolerance variations 
with seekSize is"
+                                         + " 0");
+    }
+    FileSystem fs = FileSystem.get(conf);
+    if (!fs.exists(filePath)) {
+      ChunkReadUtil.createORCFile(colCount, rowCount, filePath);
+    }
+    ChunkReadUtil.setConf(conf, minSeekSize, extraByteTolerance);
+  }
+
+  @Benchmark
+  public long read() throws IOException {
+    readRows = ChunkReadUtil.readORCFile(filePath, conf, alternate);
+    return readRows;
+  }
+
+  @TearDown
+  public void tearDown() {
+    if (readRows != rowCount) {
+      throw new IllegalArgumentException(String.format(
+        "readRows %d is not equal to expected rows %d", readRows, rowCount));
+    }
+  }
+}
diff --git 
a/java/bench/core/src/java/org/apache/orc/bench/core/impl/ChunkReadUtil.java 
b/java/bench/core/src/java/org/apache/orc/bench/core/impl/ChunkReadUtil.java
new file mode 100644
index 000000000..6877c9e34
--- /dev/null
+++ b/java/bench/core/src/java/org/apache/orc/bench/core/impl/ChunkReadUtil.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.core.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+
+public class ChunkReadUtil {
+  private static final int SCALE = 6;
+
+  static void setConf(Configuration conf, int minSeekSize, double 
extraByteTolerance) {
+    OrcConf.ORC_MIN_DISK_SEEK_SIZE.setInt(conf, minSeekSize);
+    OrcConf.ORC_MIN_DISK_SEEK_SIZE_TOLERANCE.setDouble(conf, 
extraByteTolerance);
+  }
+
+  static long readORCFile(Path file, Configuration conf, boolean alternate)
+    throws IOException {
+    Reader r = OrcFile.createReader(file, OrcFile.readerOptions(conf));
+    long rowCount = 0;
+    VectorizedRowBatch batch = r.getSchema().createRowBatch();
+    Reader.Options o = r.options();
+    if (alternate) {
+      o.include(includeAlternate(r.getSchema()));
+    }
+    RecordReader rr = r.rows(o);
+    while (rr.nextBatch(batch)) {
+      rowCount += batch.size;
+    }
+    return rowCount;
+  }
+
+  private static boolean[] includeAlternate(TypeDescription schema) {
+    boolean[] includes = new boolean[schema.getMaximumId() + 1];
+    for (int i = 1; i < includes.length; i += 2) {
+      includes[i] = true;
+    }
+    includes[0] = true;
+    return includes;
+  }
+
+  static long createORCFile(int colCount, int rowCount, Path file) throws 
IOException {
+    TypeDescription schema = createSchema(colCount);
+    return writeFile(schema, rowCount, file);
+  }
+
+  private static long writeFile(TypeDescription schema, int rowCount, Path 
path)
+    throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+
+    try (Writer writer = OrcFile.createWriter(path,
+                                              OrcFile.writerOptions(conf)
+                                                .fileSystem(fs)
+                                                .overwrite(true)
+                                                .rowIndexStride(8192)
+                                                .setSchema(schema)
+                                                .overwrite(true))) {
+      Random rnd = new Random(1024);
+      VectorizedRowBatch b = schema.createRowBatch();
+      for (int rowIdx = 0; rowIdx < rowCount; rowIdx++) {
+        ((LongColumnVector) b.cols[0]).vector[b.size] = rowIdx;
+        long v = rnd.nextLong();
+        for (int colIdx = 1; colIdx < schema.getChildren().size() - 1; 
colIdx++) {
+          switch (schema.getChildren().get(colIdx).getCategory()) {
+            case LONG:
+              ((LongColumnVector) b.cols[colIdx]).vector[b.size] = v;
+              break;
+            case DECIMAL:
+              HiveDecimalWritable d = new HiveDecimalWritable();
+              d.setFromLongAndScale(v, SCALE);
+              ((DecimalColumnVector) b.cols[colIdx]).vector[b.size] = d;
+              break;
+            case STRING:
+              ((BytesColumnVector) b.cols[colIdx]).setVal(b.size,
+                                                          String.valueOf(v)
+                                                            
.getBytes(StandardCharsets.UTF_8));
+              break;
+            default:
+              throw new IllegalArgumentException();
+          }
+        }
+        b.size += 1;
+        if (b.size == b.getMaxSize()) {
+          writer.addRowBatch(b);
+          b.reset();
+        }
+      }
+      if (b.size > 0) {
+        writer.addRowBatch(b);
+        b.reset();
+      }
+    }
+    return fs.getFileStatus(path).getLen();
+  }
+
+  private static TypeDescription createSchema(int colCount) {
+    TypeDescription schema = TypeDescription.createStruct()
+      .addField("id", TypeDescription.createLong());
+    for (int i = 1; i <= colCount; i++) {
+      TypeDescription fieldType;
+      switch (i % 3) {
+        case 0:
+          fieldType = TypeDescription.createString();
+          break;
+        case 1:
+          fieldType = 
TypeDescription.createDecimal().withPrecision(20).withScale(SCALE);
+          break;
+        default:
+          fieldType = TypeDescription.createLong();
+          break;
+      }
+      schema.addField("f_" + i, fieldType);
+    }
+    return schema;
+  }
+}
diff --git 
a/java/bench/core/src/test/org/apache/orc/bench/core/impl/ChunkReadUtilTest.java
 
b/java/bench/core/src/test/org/apache/orc/bench/core/impl/ChunkReadUtilTest.java
new file mode 100644
index 000000000..1169998d8
--- /dev/null
+++ 
b/java/bench/core/src/test/org/apache/orc/bench/core/impl/ChunkReadUtilTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.core.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ChunkReadUtilTest {
+  private static final Path workDir = new 
Path(System.getProperty("test.tmp.dir",
+                                                                  "target" + 
File.separator + "test"
+                                                                  + 
File.separator + "tmp"));
+  private static final Path filePath = new Path(workDir, 
"chunk_read_file.orc");
+  private static long fileLength;
+  private static final int ROW_COUNT = 524288;
+  private static final int COL_COUNT = 16;
+
+  @BeforeAll
+  public static void setup() throws IOException {
+    fileLength = ChunkReadUtil.createORCFile(COL_COUNT, ROW_COUNT, filePath);
+  }
+
+  private static void readStart() {
+    FileSystem.clearStatistics();
+  }
+
+  private static FileSystem.Statistics readEnd() {
+    return FileSystem.getAllStatistics().get(0);
+  }
+
+  @Test
+  public void testReadAll() throws IOException {
+    Configuration conf = new Configuration();
+    readStart();
+    assertEquals(ROW_COUNT, ChunkReadUtil.readORCFile(filePath, conf, false));
+    assertTrue((readEnd().getBytesRead() / (double) fileLength) > 1);
+  }
+
+  @Test
+  public void testReadAlternate() throws IOException {
+    Configuration conf = new Configuration();
+    readStart();
+    assertEquals(ROW_COUNT, ChunkReadUtil.readORCFile(filePath, conf, true));
+    assertTrue((readEnd().getBytesRead() / (double) fileLength) < .5);
+  }
+
+  @Test
+  public void testReadAlternateWMinSeekSize() throws IOException {
+    Configuration conf = new Configuration();
+    ChunkReadUtil.setConf(conf, 4 * 1024 * 1024, 10);
+    readStart();
+    assertEquals(ROW_COUNT, ChunkReadUtil.readORCFile(filePath, conf, true));
+    double readFraction = readEnd().getBytesRead() / (double) fileLength;
+    assertTrue(readFraction > 1 && readFraction < 1.01);
+  }
+
+  @Test
+  public void testReadAlternateWMinSeekSizeDrop() throws IOException {
+    Configuration conf = new Configuration();
+    ChunkReadUtil.setConf(conf, 4 * 1024 * 1024, 0);
+    readStart();
+    assertEquals(ROW_COUNT, ChunkReadUtil.readORCFile(filePath, conf, true));
+    double readFraction = readEnd().getBytesRead() / (double) fileLength;
+    assertTrue(readFraction > 1 && readFraction < 1.01);
+  }
+}
\ No newline at end of file

Reply via email to