This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 479a97a7 fix: CometReader.loadVector should not overwrite dictionary
ids (#476)
479a97a7 is described below
commit 479a97a7a40f13609d591a5a08c1265b1cc74fdd
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon May 27 18:55:19 2024 -0700
fix: CometReader.loadVector should not overwrite dictionary ids (#476)
* fix: CometReader.loadVector should not overwrite dictionary ids
* For review
---
.../org/apache/arrow/c/CometSchemaImporter.java | 73 ++++++++++++++++++++++
.../java/org/apache/comet/parquet/BatchReader.java | 13 ++++
.../org/apache/comet/parquet/ColumnReader.java | 23 ++++---
.../org/apache/comet/parquet/LazyColumnReader.java | 4 +-
.../main/java/org/apache/comet/parquet/Utils.java | 10 ++-
5 files changed, 107 insertions(+), 16 deletions(-)
diff --git a/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java
b/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java
new file mode 100644
index 00000000..32955f1a
--- /dev/null
+++ b/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.arrow.c;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.types.pojo.Field;
+
+/** This is a simple wrapper around SchemaImporter to make it accessible from
Java Arrow. */
+public class CometSchemaImporter {
+ private final BufferAllocator allocator;
+ private final SchemaImporter importer;
+ private final CDataDictionaryProvider provider = new
CDataDictionaryProvider();
+
+ public CometSchemaImporter(BufferAllocator allocator) {
+ this.allocator = allocator;
+ this.importer = new SchemaImporter(allocator);
+ }
+
+ public BufferAllocator getAllocator() {
+ return allocator;
+ }
+
+ public CDataDictionaryProvider getProvider() {
+ return provider;
+ }
+
+ public Field importField(ArrowSchema schema) {
+ try {
+ return importer.importField(schema, provider);
+ } finally {
+ schema.release();
+ schema.close();
+ }
+ }
+
+ /**
+ * Imports data from ArrowArray/ArrowSchema into a FieldVector. This is
basically the same as Java
+ * Arrow `Data.importVector`. `Data.importVector` initiates `SchemaImporter`
internally which is
+ * used to fill dictionary ids for dictionary encoded vectors. Every call to
`importVector` will
+ * begin with dictionary ids starting from 0. So, separate calls to
`importVector` will overwrite
+ * dictionary ids. To avoid this, we need to use the same `SchemaImporter`
instance for all calls
+ * to `importVector`.
+ */
+ public FieldVector importVector(ArrowArray array, ArrowSchema schema) {
+ Field field = importField(schema);
+ FieldVector vector = field.createVector(allocator);
+ Data.importIntoVector(allocator, array, vector, provider);
+
+ return vector;
+ }
+
+ public void close() {
+ provider.close();
+ }
+}
diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java
b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
index 9940390d..bf8e6e55 100644
--- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
@@ -37,6 +37,9 @@ import scala.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.arrow.c.CometSchemaImporter;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -88,6 +91,7 @@ import org.apache.comet.vector.CometVector;
*/
public class BatchReader extends RecordReader<Void, ColumnarBatch> implements
Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
+ protected static final BufferAllocator ALLOCATOR = new RootAllocator();
private Configuration conf;
private int capacity;
@@ -104,6 +108,7 @@ public class BatchReader extends RecordReader<Void,
ColumnarBatch> implements Cl
private MessageType requestedSchema;
private CometVector[] vectors;
private AbstractColumnReader[] columnReaders;
+ private CometSchemaImporter importer;
private ColumnarBatch currentBatch;
private Future<Option<Throwable>> prefetchTask;
private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
@@ -515,6 +520,10 @@ public class BatchReader extends RecordReader<Void,
ColumnarBatch> implements Cl
fileReader.close();
fileReader = null;
}
+ if (importer != null) {
+ importer.close();
+ importer = null;
+ }
}
@SuppressWarnings("deprecation")
@@ -552,6 +561,9 @@ public class BatchReader extends RecordReader<Void,
ColumnarBatch> implements Cl
numRowGroupsMetric.add(1);
}
+ if (importer != null) importer.close();
+ importer = new CometSchemaImporter(ALLOCATOR);
+
List<ColumnDescriptor> columns = requestedSchema.getColumns();
for (int i = 0; i < columns.size(); i++) {
if (missingColumns[i]) continue;
@@ -564,6 +576,7 @@ public class BatchReader extends RecordReader<Void,
ColumnarBatch> implements Cl
Utils.getColumnReader(
dataType,
columns.get(i),
+ importer,
capacity,
useDecimal128,
useLazyMaterialization,
diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
index 50991d5b..3d4cb3aa 100644
--- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
@@ -27,10 +27,7 @@ import org.slf4j.LoggerFactory;
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
-import org.apache.arrow.c.CDataDictionaryProvider;
-import org.apache.arrow.c.Data;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.c.CometSchemaImporter;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
@@ -53,7 +50,6 @@ import org.apache.comet.vector.CometVector;
public class ColumnReader extends AbstractColumnReader {
protected static final Logger LOG =
LoggerFactory.getLogger(ColumnReader.class);
- protected static final BufferAllocator ALLOCATOR = new RootAllocator();
/**
* The current Comet vector holding all the values read by this column
reader. Owned by this
@@ -89,18 +85,19 @@ public class ColumnReader extends AbstractColumnReader {
*/
boolean hadNull;
- /** Dictionary provider for this column. */
- private final CDataDictionaryProvider dictionaryProvider = new
CDataDictionaryProvider();
+ private final CometSchemaImporter importer;
public ColumnReader(
DataType type,
ColumnDescriptor descriptor,
+ CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
super(type, descriptor, useDecimal128, useLegacyDateTimestamp);
assert batchSize > 0 : "Batch size must be positive, found " + batchSize;
this.batchSize = batchSize;
+ this.importer = importer;
initNative();
}
@@ -164,7 +161,6 @@ public class ColumnReader extends AbstractColumnReader {
currentVector.close();
currentVector = null;
}
- dictionaryProvider.close();
super.close();
}
@@ -209,10 +205,11 @@ public class ColumnReader extends AbstractColumnReader {
try (ArrowArray array = ArrowArray.wrap(addresses[0]);
ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
- FieldVector vector = Data.importVector(ALLOCATOR, array, schema,
dictionaryProvider);
+ FieldVector vector = importer.importVector(array, schema);
+
DictionaryEncoding dictionaryEncoding =
vector.getField().getDictionary();
- CometPlainVector cometVector = new CometPlainVector(vector,
useDecimal128, isUuid);
+ CometPlainVector cometVector = new CometPlainVector(vector,
useDecimal128);
// Update whether the current vector contains any null values. This is
used in the following
// batch(s) to determine whether we can skip loading the native vector.
@@ -234,15 +231,17 @@ public class ColumnReader extends AbstractColumnReader {
// We should already re-initiate `CometDictionary` here because
`Data.importVector` API will
// release the previous dictionary vector and create a new one.
- Dictionary arrowDictionary =
dictionaryProvider.lookup(dictionaryEncoding.getId());
+ Dictionary arrowDictionary =
importer.getProvider().lookup(dictionaryEncoding.getId());
CometPlainVector dictionaryVector =
new CometPlainVector(arrowDictionary.getVector(), useDecimal128,
isUuid);
dictionary = new CometDictionary(dictionaryVector);
currentVector =
new CometDictionaryVector(
- cometVector, dictionary, dictionaryProvider, useDecimal128,
false, isUuid);
+ cometVector, dictionary, importer.getProvider(), useDecimal128,
false, isUuid);
+ currentVector =
+ new CometDictionaryVector(cometVector, dictionary,
importer.getProvider(), useDecimal128);
return currentVector;
}
}
diff --git
a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
index a15d8419..dd08a88a 100644
--- a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
@@ -21,6 +21,7 @@ package org.apache.comet.parquet;
import java.io.IOException;
+import org.apache.arrow.c.CometSchemaImporter;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReader;
import org.apache.spark.sql.types.DataType;
@@ -45,10 +46,11 @@ public class LazyColumnReader extends ColumnReader {
public LazyColumnReader(
DataType sparkReadType,
ColumnDescriptor descriptor,
+ CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
- super(sparkReadType, descriptor, batchSize, useDecimal128,
useLegacyDateTimestamp);
+ super(sparkReadType, descriptor, importer, batchSize, useDecimal128,
useLegacyDateTimestamp);
this.batchSize = 0; // the batch size is set later in `readBatch`
this.vector = new CometLazyVector(sparkReadType, this, useDecimal128);
}
diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java
b/common/src/main/java/org/apache/comet/parquet/Utils.java
index 95ca06cd..99f3a4ed 100644
--- a/common/src/main/java/org/apache/comet/parquet/Utils.java
+++ b/common/src/main/java/org/apache/comet/parquet/Utils.java
@@ -19,6 +19,7 @@
package org.apache.comet.parquet;
+import org.apache.arrow.c.CometSchemaImporter;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
@@ -28,26 +29,29 @@ public class Utils {
public static ColumnReader getColumnReader(
DataType type,
ColumnDescriptor descriptor,
+ CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLazyMaterialization) {
// TODO: support `useLegacyDateTimestamp` for Iceberg
return getColumnReader(
- type, descriptor, batchSize, useDecimal128, useLazyMaterialization,
true);
+ type, descriptor, importer, batchSize, useDecimal128,
useLazyMaterialization, true);
}
public static ColumnReader getColumnReader(
DataType type,
ColumnDescriptor descriptor,
+ CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLazyMaterialization,
boolean useLegacyDateTimestamp) {
if (useLazyMaterialization && supportLazyMaterialization(type)) {
return new LazyColumnReader(
- type, descriptor, batchSize, useDecimal128, useLegacyDateTimestamp);
+ type, descriptor, importer, batchSize, useDecimal128,
useLegacyDateTimestamp);
} else {
- return new ColumnReader(type, descriptor, batchSize, useDecimal128,
useLegacyDateTimestamp);
+ return new ColumnReader(
+ type, descriptor, importer, batchSize, useDecimal128,
useLegacyDateTimestamp);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]