Repository: spark Updated Branches: refs/heads/master c70c38eb9 -> dec9aa3b3
[SPARK-20961][SQL] generalize the dictionary in ColumnVector ## What changes were proposed in this pull request? As the first step of https://issues.apache.org/jira/browse/SPARK-20960 , to make `ColumnVector` public, this PR generalize `ColumnVector.dictionary` to not couple with parquet. ## How was this patch tested? existing tests Author: Wenchen Fan <[email protected]> Closes #18183 from cloud-fan/dictionary. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dec9aa3b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dec9aa3b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dec9aa3b Branch: refs/heads/master Commit: dec9aa3b37c01454065a4d8899859991f43d4c66 Parents: c70c38e Author: Wenchen Fan <[email protected]> Authored: Sun Jun 4 13:43:51 2017 -0700 Committer: Xiao Li <[email protected]> Committed: Sun Jun 4 13:43:51 2017 -0700 ---------------------------------------------------------------------- .../datasources/parquet/ParquetDictionary.java | 53 ++++++++++++++++++++ .../parquet/VectorizedColumnReader.java | 2 +- .../parquet/VectorizedParquetRecordReader.java | 17 +++---- .../sql/execution/vectorized/ColumnVector.java | 15 ++---- .../sql/execution/vectorized/Dictionary.java | 34 +++++++++++++ 5 files changed, 100 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dec9aa3b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java new file mode 100644 index 0000000..0930ede --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import org.apache.spark.sql.execution.vectorized.Dictionary; + +public final class ParquetDictionary implements Dictionary { + private org.apache.parquet.column.Dictionary dictionary; + + public ParquetDictionary(org.apache.parquet.column.Dictionary dictionary) { + this.dictionary = dictionary; + } + + @Override + public int decodeToInt(int id) { + return dictionary.decodeToInt(id); + } + + @Override + public long decodeToLong(int id) { + return dictionary.decodeToLong(id); + } + + @Override + public float decodeToFloat(int id) { + return dictionary.decodeToFloat(id); + } + + @Override + public double decodeToDouble(int id) { + return dictionary.decodeToDouble(id); + } + + @Override + public byte[] decodeToBinary(int id) { + return dictionary.decodeToBinary(id).getBytes(); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/dec9aa3b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 9d641b5..fd8db17 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -169,7 +169,7 @@ public class VectorizedColumnReader { // Column vector supports lazy decoding of dictionary values so just set the dictionary. // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some // non-dictionary encoded values have already been added). - column.setDictionary(dictionary); + column.setDictionary(new ParquetDictionary(dictionary)); } else { decodeDictionaryIds(rowId, num, column, dictionaryIds); } http://git-wip-us.apache.org/repos/asf/spark/blob/dec9aa3b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 51bdf0f..04f8141 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -154,12 +154,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa return (float) rowsReturned / totalRowCount; } - /** - * Returns the ColumnarBatch object that will be used for all rows returned by this reader. - * This object is reused. Calling this enables the vectorized reader. This should be called - * before any calls to nextKeyValue/nextBatch. - */ - // Creates a columnar batch that includes the schema from the data files and the additional // partition columns appended to the end of the batch. // For example, if the data contains two columns, with 2 partition columns: @@ -204,12 +198,17 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues); } + /** + * Returns the ColumnarBatch object that will be used for all rows returned by this reader. + * This object is reused. Calling this enables the vectorized reader. This should be called + * before any calls to nextKeyValue/nextBatch. + */ public ColumnarBatch resultBatch() { if (columnarBatch == null) initBatch(); return columnarBatch; } - /* + /** * Can be called before any rows are returned to enable returning columnar batches directly. */ public void enableReturningBatches() { @@ -237,9 +236,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa } private void initializeInternal() throws IOException, UnsupportedOperationException { - /** - * Check that the requested schema is supported. - */ + // Check that the requested schema is supported. missingColumns = new boolean[requestedSchema.getFieldCount()]; for (int i = 0; i < requestedSchema.getFieldCount(); ++i) { Type t = requestedSchema.getFields().get(i); http://git-wip-us.apache.org/repos/asf/spark/blob/dec9aa3b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index ad267ab..24260a6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -20,8 +20,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import com.google.common.annotations.VisibleForTesting; -import org.apache.parquet.column.Dictionary; -import org.apache.parquet.io.api.Binary; import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; @@ -313,8 +311,8 @@ public abstract class ColumnVector implements AutoCloseable { } /** - * Ensures that there is enough storage to store capcity elements. That is, the put() APIs - * must work for all rowIds < capcity. + * Ensures that there is enough storage to store capacity elements. That is, the put() APIs + * must work for all rowIds < capacity. */ protected abstract void reserveInternal(int capacity); @@ -479,7 +477,6 @@ public abstract class ColumnVector implements AutoCloseable { /** * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count) - * src should contain `count` doubles written as ieee format. */ public abstract void putFloats(int rowId, int count, float[] src, int srcIndex); @@ -506,7 +503,6 @@ public abstract class ColumnVector implements AutoCloseable { /** * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count) - * src should contain `count` doubles written as ieee format. */ public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex); @@ -628,8 +624,8 @@ public abstract class ColumnVector implements AutoCloseable { ColumnVector.Array a = getByteArray(rowId); return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length); } else { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); - return UTF8String.fromBytes(v.getBytes()); + byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); + return UTF8String.fromBytes(bytes); } } @@ -643,8 +639,7 @@ public abstract class ColumnVector implements AutoCloseable { System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length); return bytes; } else { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); - return v.getBytes(); + return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); } } http://git-wip-us.apache.org/repos/asf/spark/blob/dec9aa3b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/Dictionary.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/Dictionary.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/Dictionary.java new file mode 100644 index 0000000..c698168 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/Dictionary.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized; + +/** + * The interface for dictionary in ColumnVector to decode dictionary encoded values. + */ +public interface Dictionary { + + int decodeToInt(int id); + + long decodeToLong(int id); + + float decodeToFloat(int id); + + double decodeToDouble(int id); + + byte[] decodeToBinary(int id); +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
