This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch column-indexes
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/column-indexes by this push:
new 55d791c PARQUET-1389: Improve value skipping at page synchronization
(#514)
55d791c is described below
commit 55d791c592ba9ea97123408040aa3fa01c632a81
Author: Gabor Szadovszky <[email protected]>
AuthorDate: Tue Sep 11 13:52:28 2018 +0200
PARQUET-1389: Improve value skipping at page synchronization (#514)
---
.../parquet/column/impl/ColumnReaderBase.java | 47 +++++++++++++++++++++-
.../apache/parquet/column/values/ValuesReader.java | 12 ++++++
.../delta/DeltaBinaryPackingValuesReader.java | 8 ++++
.../DeltaLengthByteArrayValuesReader.java | 12 ++++--
.../plain/FixedLenByteArrayPlainValuesReader.java | 8 +++-
.../column/values/plain/PlainValuesReader.java | 36 +++++++++++------
.../column/values/rle/ZeroIntegerValuesReader.java | 6 ++-
7 files changed, 109 insertions(+), 20 deletions(-)
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
index 0af85c7..7682236 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
@@ -73,6 +73,14 @@ abstract class ColumnReaderBase implements ColumnReader {
abstract void skip();
/**
+ * Skips n values from the underlying page
+ *
+ * @param n
+ * the number of values to be skipped
+ */
+ abstract void skip(int n);
+
+ /**
* write current value to converter
*/
abstract void writeValue();
@@ -163,6 +171,10 @@ abstract class ColumnReaderBase implements ColumnReader {
public void skip() {
dataColumn.skip();
}
+ @Override
+ void skip(int n) {
+ dataColumn.skip(n);
+ }
public int getDictionaryId() {
return dictionaryId;
}
@@ -203,6 +215,11 @@ abstract class ColumnReaderBase implements ColumnReader {
current = 0;
dataColumn.skip();
}
+ @Override
+ void skip(int n) {
+ current = 0;
+ dataColumn.skip(n);
+ }
public float getFloat() {
return current;
}
@@ -222,6 +239,11 @@ abstract class ColumnReaderBase implements ColumnReader {
current = 0;
dataColumn.skip();
}
+ @Override
+ void skip(int n) {
+ current = 0;
+ dataColumn.skip(n);
+ }
public double getDouble() {
return current;
}
@@ -242,6 +264,11 @@ abstract class ColumnReaderBase implements ColumnReader {
dataColumn.skip();
}
@Override
+ void skip(int n) {
+ current = 0;
+ dataColumn.skip(n);
+ }
+ @Override
public int getInteger() {
return current;
}
@@ -262,6 +289,11 @@ abstract class ColumnReaderBase implements ColumnReader {
dataColumn.skip();
}
@Override
+ void skip(int n) {
+ current = 0;
+ dataColumn.skip(n);
+ }
+ @Override
public long getLong() {
return current;
}
@@ -291,6 +323,11 @@ abstract class ColumnReaderBase implements ColumnReader {
dataColumn.skip();
}
@Override
+ void skip(int n) {
+ current = false;
+ dataColumn.skip(n);
+ }
+ @Override
public boolean getBoolean() {
return current;
}
@@ -311,6 +348,11 @@ abstract class ColumnReaderBase implements ColumnReader {
dataColumn.skip();
}
@Override
+ void skip(int n) {
+ current = null;
+ dataColumn.skip(n);
+ }
+ @Override
public Binary getBinary() {
return current;
}
@@ -511,6 +553,7 @@ abstract class ColumnReaderBase implements ColumnReader {
private void checkRead() {
int rl, dl;
+ int skipValues = 0;
for (;;) {
if (isPageFullyConsumed()) {
if (isFullyConsumed()) {
@@ -519,6 +562,7 @@ abstract class ColumnReaderBase implements ColumnReader {
return;
}
readPage();
+ skipValues = 0;
}
rl = repetitionLevelColumn.nextInt();
dl = definitionLevelColumn.nextInt();
@@ -527,9 +571,10 @@ abstract class ColumnReaderBase implements ColumnReader {
break;
}
if (dl == maxDefinitionLevel) {
- binding.skip();
+ ++skipValues;
}
}
+ binding.skip(skipValues);
repetitionLevel = rl;
definitionLevel = dl;
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
index 5732660..3167d82 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
@@ -109,5 +109,17 @@ public abstract class ValuesReader {
* Skips the next value in the page
*/
abstract public void skip();
+
+ /**
+ * Skips the next n value in the page
+ *
+ * @param n
+ * the number of values to be skipped
+ */
+ public void skip(int n) {
+ for (int i = 0; i < n; ++i) {
+ skip();
+ }
+ }
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
index dceaa52..58e02f2 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -89,6 +89,14 @@ public class DeltaBinaryPackingValuesReader extends
ValuesReader {
}
@Override
+ public void skip(int n) {
+ // checkRead() is invoked before incrementing valuesRead so increase
valuesRead size in 2 steps
+ valuesRead += n - 1;
+ checkRead();
+ ++valuesRead;
+ }
+
+ @Override
public int readInteger() {
// TODO: probably implement it separately
return (int) readLong();
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
index 1a2ccb9..4dbbcb5 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
@@ -20,8 +20,6 @@ package org.apache.parquet.column.values.deltalengthbytearray;
import java.io.IOException;
-import java.nio.ByteBuffer;
-
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -64,7 +62,15 @@ public class DeltaLengthByteArrayValuesReader extends
ValuesReader {
@Override
public void skip() {
- int length = lengthReader.readInteger();
+ skip(1);
+ }
+
+ @Override
+ public void skip(int n) {
+ int length = 0;
+ for (int i = 0; i < n; ++i) {
+ length += lengthReader.readInteger();
+ }
try {
in.skipFully(length);
} catch (IOException e) {
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
index 15ed434..631c908 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -19,7 +19,6 @@
package org.apache.parquet.column.values.plain;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
@@ -51,8 +50,13 @@ public class FixedLenByteArrayPlainValuesReader extends
ValuesReader {
@Override
public void skip() {
+ skip(1);
+ }
+
+ @Override
+ public void skip(int n) {
try {
- in.skipFully(length);
+ in.skipFully(n * length);
} catch (IOException | RuntimeException e) {
throw new ParquetDecodingException("could not skip bytes at offset " +
in.position(), e);
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
index f576528..127817e 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
@@ -41,14 +41,26 @@ abstract public class PlainValuesReader extends
ValuesReader {
this.in = new LittleEndianDataInputStream(stream.remainingStream());
}
+ @Override
+ public void skip() {
+ skip(1);
+ }
+
+ void skipBytesFully(int n) throws IOException {
+ int skipped = 0;
+ while (skipped < n) {
+ skipped += in.skipBytes(n - skipped);
+ }
+ }
+
public static class DoublePlainValuesReader extends PlainValuesReader {
@Override
- public void skip() {
+ public void skip(int n) {
try {
- in.skipBytes(8);
+ skipBytesFully(n * 8);
} catch (IOException e) {
- throw new ParquetDecodingException("could not skip double", e);
+ throw new ParquetDecodingException("could not skip " + n + " double
values", e);
}
}
@@ -65,11 +77,11 @@ abstract public class PlainValuesReader extends
ValuesReader {
public static class FloatPlainValuesReader extends PlainValuesReader {
@Override
- public void skip() {
+ public void skip(int n) {
try {
- in.skipBytes(4);
+ skipBytesFully(n * 4);
} catch (IOException e) {
- throw new ParquetDecodingException("could not skip float", e);
+ throw new ParquetDecodingException("could not skip " + n + " floats",
e);
}
}
@@ -86,11 +98,11 @@ abstract public class PlainValuesReader extends
ValuesReader {
public static class IntegerPlainValuesReader extends PlainValuesReader {
@Override
- public void skip() {
+ public void skip(int n) {
try {
- in.skipBytes(4);
+ in.skipBytes(n * 4);
} catch (IOException e) {
- throw new ParquetDecodingException("could not skip int", e);
+ throw new ParquetDecodingException("could not skip " + n + " ints", e);
}
}
@@ -107,11 +119,11 @@ abstract public class PlainValuesReader extends
ValuesReader {
public static class LongPlainValuesReader extends PlainValuesReader {
@Override
- public void skip() {
+ public void skip(int n) {
try {
- in.skipBytes(8);
+ in.skipBytes(n * 8);
} catch (IOException e) {
- throw new ParquetDecodingException("could not skip long", e);
+ throw new ParquetDecodingException("could not skip " + n + " longs",
e);
}
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
index fe00de9..8039cf9 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
@@ -19,8 +19,6 @@
package org.apache.parquet.column.values.rle;
import java.io.IOException;
-import java.nio.ByteBuffer;
-
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
@@ -43,4 +41,8 @@ public class ZeroIntegerValuesReader extends ValuesReader {
public void skip() {
}
+ @Override
+ public void skip(int n) {
+ }
+
}