This is an automated email from the ASF dual-hosted git repository.
yqzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 7facf8184 ORC-1172: Add row count limit config in one stripe (#1118)
7facf8184 is described below
commit 7facf81846a5e4395f2dbab335faef93f04c6aee
Author: dengweisysu <[email protected]>
AuthorDate: Tue May 24 13:45:09 2022 +0800
ORC-1172: Add row count limit config in one stripe (#1118)
### What changes were proposed in this pull request?
add row count limit config "orc.stripe.row.count" to limit row count in one
stripe.
### Why are the changes needed?
for query engine like prestoļ¼stripe is the base unit for query concurrency,
one stripe can only be processed by one split.
In current implement of orc writer, the only config which can control row
count in stripe is the "orc.stripe.size".
But for different kind of table, the row count is difficult to use.
for table with much columns( eg. 100 columns), 64MB may contain 5000 rows.
for table with less columns(eg. 5 columns), 64MB may contain 100000 rows.
for presto, normal olap query only read a subset of table columns, the row
count is the key factor of query performance. If one stripe contain much rows,
the query performance may become too low.
So, besides the config "orc.stripe.size", we need another config like
"orc.stripe.row.count" to control the row count of one stripe.
The similar config has been introduced to cudf ( a GPU DataFrame library
base on apache arrow):
[rapidsai/cudf#9261](https://github.com/rapidsai/cudf/issues/9261)
### How was this patch tested?
testStripeRowCountLimit added.
can be test by command below:
```
cd java
./mvnw -Dtest=TestWriterImpl test
```
---
java/core/src/java/org/apache/orc/OrcConf.java | 7 ++++++-
java/core/src/java/org/apache/orc/OrcFile.java | 6 ++++++
.../src/java/org/apache/orc/impl/WriterImpl.java | 10 +++++++---
.../test/org/apache/orc/impl/TestWriterImpl.java | 22 ++++++++++++++++++++++
4 files changed, 41 insertions(+), 4 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/OrcConf.java
b/java/core/src/java/org/apache/orc/OrcConf.java
index e623080bd..aef00fa63 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -29,6 +29,10 @@ public enum OrcConf {
STRIPE_SIZE("orc.stripe.size", "hive.exec.orc.default.stripe.size",
64L * 1024 * 1024,
"Define the default ORC stripe size, in bytes."),
+ STRIPE_ROW_COUNT("orc.stripe.row.count", "orc.stripe.row.count",
+ Integer.MAX_VALUE, "This value limit the row count in one stripe. \n" +
+ "The number of stripe rows can be controlled at \n" +
+ "(0, \"orc.stripe.row.count\" + max(batchSize,
\"orc.rows.between.memory.checks\"))"),
BLOCK_SIZE("orc.block.size", "hive.exec.orc.default.block.size",
256L * 1024 * 1024,
"Define the default file system block size for ORC files."),
@@ -161,7 +165,8 @@ public enum OrcConf {
ROWS_BETWEEN_CHECKS("orc.rows.between.memory.checks",
"orc.rows.between.memory.checks", 5000,
"How often should MemoryManager check the memory sizes? Measured in
rows\n" +
"added to all of the writers. Valid range is [1,10000] and is primarily
meant for" +
- "testing. Setting this too low may negatively affect performance."),
+ "testing. Setting this too low may negatively affect performance."
+ + " Use orc.stripe.row.count instead if the value larger than
orc.stripe.row.count."),
OVERWRITE_OUTPUT_FILE("orc.overwrite.output.file",
"orc.overwrite.output.file", false,
"A boolean flag to enable overwriting of the output file if it already
exists.\n"),
IS_SCHEMA_EVOLUTION_CASE_SENSITIVE("orc.schema.evolution.case.sensitive",
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java
b/java/core/src/java/org/apache/orc/OrcFile.java
index a23a3f52e..fa9487b3d 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -430,6 +430,7 @@ public class OrcFile {
private FileSystem fileSystemValue = null;
private TypeDescription schema = null;
private long stripeSizeValue;
+ private long stripeRowCountValue;
private long blockSizeValue;
private int rowIndexStrideValue;
private int bufferSizeValue;
@@ -463,6 +464,7 @@ public class OrcFile {
memoryManagerValue = getStaticMemoryManager(conf);
overwrite = OrcConf.OVERWRITE_OUTPUT_FILE.getBoolean(tableProperties,
conf);
stripeSizeValue = OrcConf.STRIPE_SIZE.getLong(tableProperties, conf);
+ stripeRowCountValue = OrcConf.STRIPE_ROW_COUNT.getLong(tableProperties,
conf);
blockSizeValue = OrcConf.BLOCK_SIZE.getLong(tableProperties, conf);
rowIndexStrideValue =
(int) OrcConf.ROW_INDEX_STRIDE.getLong(tableProperties, conf);
@@ -871,6 +873,10 @@ public class OrcFile {
return stripeSizeValue;
}
+ public long getStripeRowCountValue() {
+ return stripeRowCountValue;
+ }
+
public CompressionKind getCompress() {
return compressValue;
}
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 072460523..e7d71a142 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -88,6 +88,7 @@ public class WriterImpl implements WriterInternal,
MemoryManager.Callback {
private final Path path;
private final long stripeSize;
+ private final long stripeRowCount;
private final int rowIndexStride;
private final TypeDescription schema;
private final PhysicalWriter physicalWriter;
@@ -210,7 +211,9 @@ public class WriterImpl implements WriterInternal,
MemoryManager.Callback {
}
// ensure that we are able to handle callbacks before we register ourselves
- ROWS_PER_CHECK = OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf);
+ ROWS_PER_CHECK = Math.min(opts.getStripeRowCountValue(),
+ OrcConf.ROWS_BETWEEN_CHECKS.getLong(conf));
+ this.stripeRowCount= opts.getStripeRowCountValue();
this.stripeSize = opts.getStripeSize();
memoryLimit = stripeSize;
memoryManager = opts.getMemoryManager();
@@ -298,9 +301,10 @@ public class WriterImpl implements WriterInternal,
MemoryManager.Callback {
long size = treeWriter.estimateMemory();
if (LOG.isDebugEnabled()) {
LOG.debug("ORC writer " + physicalWriter + " size = " + size +
- " limit = " + memoryLimit);
+ " memoryLimit = " + memoryLimit + " rowsInStripe = " +
rowsInStripe +
+ " stripeRowCountLimit = " + stripeRowCount);
}
- if (size > memoryLimit) {
+ if (size > memoryLimit || rowsInStripe >= stripeRowCount) {
flushStripe();
return true;
}
diff --git a/java/core/src/test/org/apache/orc/impl/TestWriterImpl.java
b/java/core/src/test/org/apache/orc/impl/TestWriterImpl.java
index b17e1b769..8db041d2c 100644
--- a/java/core/src/test/org/apache/orc/impl/TestWriterImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestWriterImpl.java
@@ -119,6 +119,28 @@ public class TestWriterImpl {
assertEquals(r.getStripes(), w.getStripes());
}
+ @Test
+ public void testStripeRowCountLimit() throws Exception {
+ conf.set(OrcConf.OVERWRITE_OUTPUT_FILE.getAttribute(), "true");
+ conf.set(OrcConf.STRIPE_ROW_COUNT.getAttribute(),"100");
+ VectorizedRowBatch b = schema.createRowBatch();
+ LongColumnVector f1 = (LongColumnVector) b.cols[0];
+ LongColumnVector f2 = (LongColumnVector) b.cols[1];
+ Writer w = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf).setSchema(schema));
+ long rowCount = 1000;
+ for (int i = 0; i < rowCount; i++) {
+ f1.vector[b.size] = Long.MIN_VALUE ;
+ f2.vector[b.size] = Long.MAX_VALUE ;
+ b.size += 1;
+ if (b.size == 10) {
+ w.addRowBatch(b);
+ b.reset();
+ }
+ }
+ w.close();
+ assertEquals(10, w.getStripes().size());
+ }
+
@Test
public void testCloseIsIdempotent() throws IOException {
conf.set(OrcConf.OVERWRITE_OUTPUT_FILE.getAttribute(), "true");