This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new 54132097c ORC-1172: Add row count limit config in one stripe (#1118)
54132097c is described below
commit 54132097c3873e8826cf1d8e4f49436848dad138
Author: dengweisysu <[email protected]>
AuthorDate: Tue May 24 13:45:09 2022 +0800
ORC-1172: Add row count limit config in one stripe (#1118)
### What changes were proposed in this pull request?
add row count limit config "orc.stripe.row.count" to limit row count in one
stripe.
### Why are the changes needed?
for query engine like prestoļ¼stripe is the base unit for query concurrency,
one stripe can only be processed by one split.
In current implement of orc writer, the only config which can control row
count in stripe is the "orc.stripe.size".
But for different kind of table, the row count is difficult to use.
for table with much columns( eg. 100 columns), 64MB may contain 5000 rows.
for table with less columns(eg. 5 columns), 64MB may contain 100000 rows.
for presto, normal olap query only read a subset of table columns, the row
count is the key factor of query performance. If one stripe contain much rows,
the query performance may become too low.
So, besides the config "orc.stripe.size", we need another config like
"orc.stripe.row.count" to control the row count of one stripe.
The similar config has been introduced to cudf ( a GPU DataFrame library
base on apache arrow):
[rapidsai/cudf#9261](https://github.com/rapidsai/cudf/issues/9261)
### How was this patch tested?
testStripeRowCountLimit added.
can be test by command below:
```
cd java
./mvnw -Dtest=TestWriterImpl test
```
(cherry picked from commit 7facf81846a5e4395f2dbab335faef93f04c6aee)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
java/core/src/java/org/apache/orc/OrcConf.java | 7 ++++++-
java/core/src/java/org/apache/orc/OrcFile.java | 6 ++++++
.../src/java/org/apache/orc/impl/WriterImpl.java | 10 +++++++---
.../test/org/apache/orc/impl/TestWriterImpl.java | 22 ++++++++++++++++++++++
4 files changed, 41 insertions(+), 4 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/OrcConf.java
b/java/core/src/java/org/apache/orc/OrcConf.java
index 25bd8b973..86d6a4a3a 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -29,6 +29,10 @@ public enum OrcConf {
STRIPE_SIZE("orc.stripe.size", "hive.exec.orc.default.stripe.size",
64L * 1024 * 1024,
"Define the default ORC stripe size, in bytes."),
+ STRIPE_ROW_COUNT("orc.stripe.row.count", "orc.stripe.row.count",
+ Integer.MAX_VALUE, "This value limit the row count in one stripe. \n" +
+ "The number of stripe rows can be controlled at \n" +
+ "(0, \"orc.stripe.row.count\" + max(batchSize,
\"orc.rows.between.memory.checks\"))"),
BLOCK_SIZE("orc.block.size", "hive.exec.orc.default.block.size",
256L * 1024 * 1024,
"Define the default file system block size for ORC files."),
@@ -161,7 +165,8 @@ public enum OrcConf {
ROWS_BETWEEN_CHECKS("orc.rows.between.memory.checks",
"orc.rows.between.memory.checks", 5000,
"How often should MemoryManager check the memory sizes? Measured in
rows\n" +
"added to all of the writers. Valid range is [1,10000] and is primarily
meant for" +
- "testing. Setting this too low may negatively affect performance."),
+ "testing. Setting this too low may negatively affect performance."
+ + " Use orc.stripe.row.count instead if the value larger than
orc.stripe.row.count."),
OVERWRITE_OUTPUT_FILE("orc.overwrite.output.file",
"orc.overwrite.output.file", false,
"A boolean flag to enable overwriting of the output file if it already
exists.\n"),
IS_SCHEMA_EVOLUTION_CASE_SENSITIVE("orc.schema.evolution.case.sensitive",
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java
b/java/core/src/java/org/apache/orc/OrcFile.java
index a23a3f52e..fa9487b3d 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -430,6 +430,7 @@ public class OrcFile {
private FileSystem fileSystemValue = null;
private TypeDescription schema = null;
private long stripeSizeValue;
+ private long stripeRowCountValue;
private long blockSizeValue;
private int rowIndexStrideValue;
private int bufferSizeValue;
@@ -463,6 +464,7 @@ public class OrcFile {
memoryManagerValue = getStaticMemoryManager(conf);
overwrite = OrcConf.OVERWRITE_OUTPUT_FILE.getBoolean(tableProperties,
conf);
stripeSizeValue = OrcConf.STRIPE_SIZE.getLong(tableProperties, conf);
+ stripeRowCountValue = OrcConf.STRIPE_ROW_COUNT.getLong(tableProperties,
conf);
blockSizeValue = OrcConf.BLOCK_SIZE.getLong(tableProperties, conf);
rowIndexStrideValue =
(int) OrcConf.ROW_INDEX_STRIDE.getLong(tableProperties, conf);
@@ -871,6 +873,10 @@ public class OrcFile {
return stripeSizeValue;
}
+ public long getStripeRowCountValue() {
+ return stripeRowCountValue;
+ }
+
public CompressionKind getCompress() {
return compressValue;
}
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 072460523..e7d71a142 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -88,6 +88,7 @@ public class WriterImpl implements WriterInternal,
MemoryManager.Callback {
private final Path path;
private final long stripeSize;
+ private final long stripeRowCount;
private final int rowIndexStride;
private final TypeDescription schema;
private final PhysicalWriter physicalWriter;
@@ -210,7 +211,9 @@ public class WriterImpl implements WriterInternal,
MemoryManager.Callback {
}
// ensure that we are able to handle callbacks before we register ourselves
- ROWS_PER_CHECK = OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf);
+ ROWS_PER_CHECK = Math.min(opts.getStripeRowCountValue(),
+ OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf));
+ this.stripeRowCount= opts.getStripeRowCountValue();
this.stripeSize = opts.getStripeSize();
memoryLimit = stripeSize;
memoryManager = opts.getMemoryManager();
@@ -298,9 +301,10 @@ public class WriterImpl implements WriterInternal,
MemoryManager.Callback {
long size = treeWriter.estimateMemory();
if (LOG.isDebugEnabled()) {
LOG.debug("ORC writer " + physicalWriter + " size = " + size +
- " limit = " + memoryLimit);
+ " memoryLimit = " + memoryLimit + " rowsInStripe = " +
rowsInStripe +
+ " stripeRowCountLimit = " + stripeRowCount);
}
- if (size > memoryLimit) {
+ if (size > memoryLimit || rowsInStripe >= stripeRowCount) {
flushStripe();
return true;
}
diff --git a/java/core/src/test/org/apache/orc/impl/TestWriterImpl.java
b/java/core/src/test/org/apache/orc/impl/TestWriterImpl.java
index b17e1b769..8db041d2c 100644
--- a/java/core/src/test/org/apache/orc/impl/TestWriterImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestWriterImpl.java
@@ -119,6 +119,28 @@ public class TestWriterImpl {
assertEquals(r.getStripes(), w.getStripes());
}
+ @Test
+ public void testStripeRowCountLimit() throws Exception {
+ conf.set(OrcConf.OVERWRITE_OUTPUT_FILE.getAttribute(), "true");
+ conf.set(OrcConf.STRIPE_ROW_COUNT.getAttribute(),"100");
+ VectorizedRowBatch b = schema.createRowBatch();
+ LongColumnVector f1 = (LongColumnVector) b.cols[0];
+ LongColumnVector f2 = (LongColumnVector) b.cols[1];
+ Writer w = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf).setSchema(schema));
+ long rowCount = 1000;
+ for (int i = 0; i < rowCount; i++) {
+ f1.vector[b.size] = Long.MIN_VALUE ;
+ f2.vector[b.size] = Long.MAX_VALUE ;
+ b.size += 1;
+ if (b.size == 10) {
+ w.addRowBatch(b);
+ b.reset();
+ }
+ }
+ w.close();
+ assertEquals(10, w.getStripes().size());
+ }
+
@Test
public void testCloseIsIdempotent() throws IOException {
conf.set(OrcConf.OVERWRITE_OUTPUT_FILE.getAttribute(), "true");