This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 479a97a7 fix: CometReader.loadVector should not overwrite dictionary 
ids (#476)
479a97a7 is described below

commit 479a97a7a40f13609d591a5a08c1265b1cc74fdd
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon May 27 18:55:19 2024 -0700

    fix: CometReader.loadVector should not overwrite dictionary ids (#476)
    
    * fix: CometReader.loadVector should not overwrite dictionary ids
    
    * For review
---
 .../org/apache/arrow/c/CometSchemaImporter.java    | 73 ++++++++++++++++++++++
 .../java/org/apache/comet/parquet/BatchReader.java | 13 ++++
 .../org/apache/comet/parquet/ColumnReader.java     | 23 ++++---
 .../org/apache/comet/parquet/LazyColumnReader.java |  4 +-
 .../main/java/org/apache/comet/parquet/Utils.java  | 10 ++-
 5 files changed, 107 insertions(+), 16 deletions(-)

diff --git a/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java 
b/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java
new file mode 100644
index 00000000..32955f1a
--- /dev/null
+++ b/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.arrow.c;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.types.pojo.Field;
+
+/** This is a simple wrapper around SchemaImporter to make it accessible from 
Java Arrow. */
+public class CometSchemaImporter {
+  private final BufferAllocator allocator;
+  private final SchemaImporter importer;
+  private final CDataDictionaryProvider provider = new 
CDataDictionaryProvider();
+
+  public CometSchemaImporter(BufferAllocator allocator) {
+    this.allocator = allocator;
+    this.importer = new SchemaImporter(allocator);
+  }
+
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
+  public CDataDictionaryProvider getProvider() {
+    return provider;
+  }
+
+  public Field importField(ArrowSchema schema) {
+    try {
+      return importer.importField(schema, provider);
+    } finally {
+      schema.release();
+      schema.close();
+    }
+  }
+
+  /**
+   * Imports data from ArrowArray/ArrowSchema into a FieldVector. This is 
basically the same as Java
+   * Arrow `Data.importVector`. `Data.importVector` initiates `SchemaImporter` 
internally which is
+   * used to fill dictionary ids for dictionary encoded vectors. Every call to 
`importVector` will
+   * begin with dictionary ids starting from 0. So, separate calls to 
`importVector` will overwrite
+   * dictionary ids. To avoid this, we need to use the same `SchemaImporter` 
instance for all calls
+   * to `importVector`.
+   */
+  public FieldVector importVector(ArrowArray array, ArrowSchema schema) {
+    Field field = importField(schema);
+    FieldVector vector = field.createVector(allocator);
+    Data.importIntoVector(allocator, array, vector, provider);
+
+    return vector;
+  }
+
+  public void close() {
+    provider.close();
+  }
+}
diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java 
b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
index 9940390d..bf8e6e55 100644
--- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
@@ -37,6 +37,9 @@ import scala.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.arrow.c.CometSchemaImporter;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -88,6 +91,7 @@ import org.apache.comet.vector.CometVector;
  */
 public class BatchReader extends RecordReader<Void, ColumnarBatch> implements 
Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
+  protected static final BufferAllocator ALLOCATOR = new RootAllocator();
 
   private Configuration conf;
   private int capacity;
@@ -104,6 +108,7 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
   private MessageType requestedSchema;
   private CometVector[] vectors;
   private AbstractColumnReader[] columnReaders;
+  private CometSchemaImporter importer;
   private ColumnarBatch currentBatch;
   private Future<Option<Throwable>> prefetchTask;
   private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
@@ -515,6 +520,10 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
       fileReader.close();
       fileReader = null;
     }
+    if (importer != null) {
+      importer.close();
+      importer = null;
+    }
   }
 
   @SuppressWarnings("deprecation")
@@ -552,6 +561,9 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
       numRowGroupsMetric.add(1);
     }
 
+    if (importer != null) importer.close();
+    importer = new CometSchemaImporter(ALLOCATOR);
+
     List<ColumnDescriptor> columns = requestedSchema.getColumns();
     for (int i = 0; i < columns.size(); i++) {
       if (missingColumns[i]) continue;
@@ -564,6 +576,7 @@ public class BatchReader extends RecordReader<Void, 
ColumnarBatch> implements Cl
           Utils.getColumnReader(
               dataType,
               columns.get(i),
+              importer,
               capacity,
               useDecimal128,
               useLazyMaterialization,
diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
index 50991d5b..3d4cb3aa 100644
--- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
@@ -27,10 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.arrow.c.ArrowArray;
 import org.apache.arrow.c.ArrowSchema;
-import org.apache.arrow.c.CDataDictionaryProvider;
-import org.apache.arrow.c.Data;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.c.CometSchemaImporter;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
@@ -53,7 +50,6 @@ import org.apache.comet.vector.CometVector;
 
 public class ColumnReader extends AbstractColumnReader {
   protected static final Logger LOG = 
LoggerFactory.getLogger(ColumnReader.class);
-  protected static final BufferAllocator ALLOCATOR = new RootAllocator();
 
   /**
    * The current Comet vector holding all the values read by this column 
reader. Owned by this
@@ -89,18 +85,19 @@ public class ColumnReader extends AbstractColumnReader {
    */
   boolean hadNull;
 
-  /** Dictionary provider for this column. */
-  private final CDataDictionaryProvider dictionaryProvider = new 
CDataDictionaryProvider();
+  private final CometSchemaImporter importer;
 
   public ColumnReader(
       DataType type,
       ColumnDescriptor descriptor,
+      CometSchemaImporter importer,
       int batchSize,
       boolean useDecimal128,
       boolean useLegacyDateTimestamp) {
     super(type, descriptor, useDecimal128, useLegacyDateTimestamp);
     assert batchSize > 0 : "Batch size must be positive, found " + batchSize;
     this.batchSize = batchSize;
+    this.importer = importer;
     initNative();
   }
 
@@ -164,7 +161,6 @@ public class ColumnReader extends AbstractColumnReader {
       currentVector.close();
       currentVector = null;
     }
-    dictionaryProvider.close();
     super.close();
   }
 
@@ -209,10 +205,11 @@ public class ColumnReader extends AbstractColumnReader {
 
     try (ArrowArray array = ArrowArray.wrap(addresses[0]);
         ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
-      FieldVector vector = Data.importVector(ALLOCATOR, array, schema, 
dictionaryProvider);
+      FieldVector vector = importer.importVector(array, schema);
+
       DictionaryEncoding dictionaryEncoding = 
vector.getField().getDictionary();
 
-      CometPlainVector cometVector = new CometPlainVector(vector, 
useDecimal128, isUuid);
+      CometPlainVector cometVector = new CometPlainVector(vector, 
useDecimal128);
 
       // Update whether the current vector contains any null values. This is 
used in the following
       // batch(s) to determine whether we can skip loading the native vector.
@@ -234,15 +231,17 @@ public class ColumnReader extends AbstractColumnReader {
 
       // We should already re-initiate `CometDictionary` here because 
`Data.importVector` API will
       // release the previous dictionary vector and create a new one.
-      Dictionary arrowDictionary = 
dictionaryProvider.lookup(dictionaryEncoding.getId());
+      Dictionary arrowDictionary = 
importer.getProvider().lookup(dictionaryEncoding.getId());
       CometPlainVector dictionaryVector =
           new CometPlainVector(arrowDictionary.getVector(), useDecimal128, 
isUuid);
       dictionary = new CometDictionary(dictionaryVector);
 
       currentVector =
           new CometDictionaryVector(
-              cometVector, dictionary, dictionaryProvider, useDecimal128, 
false, isUuid);
+              cometVector, dictionary, importer.getProvider(), useDecimal128, 
false, isUuid);
 
+      currentVector =
+          new CometDictionaryVector(cometVector, dictionary, 
importer.getProvider(), useDecimal128);
       return currentVector;
     }
   }
diff --git 
a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
index a15d8419..dd08a88a 100644
--- a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java
@@ -21,6 +21,7 @@ package org.apache.comet.parquet;
 
 import java.io.IOException;
 
+import org.apache.arrow.c.CometSchemaImporter;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.spark.sql.types.DataType;
@@ -45,10 +46,11 @@ public class LazyColumnReader extends ColumnReader {
   public LazyColumnReader(
       DataType sparkReadType,
       ColumnDescriptor descriptor,
+      CometSchemaImporter importer,
       int batchSize,
       boolean useDecimal128,
       boolean useLegacyDateTimestamp) {
-    super(sparkReadType, descriptor, batchSize, useDecimal128, 
useLegacyDateTimestamp);
+    super(sparkReadType, descriptor, importer, batchSize, useDecimal128, 
useLegacyDateTimestamp);
     this.batchSize = 0; // the batch size is set later in `readBatch`
     this.vector = new CometLazyVector(sparkReadType, this, useDecimal128);
   }
diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java 
b/common/src/main/java/org/apache/comet/parquet/Utils.java
index 95ca06cd..99f3a4ed 100644
--- a/common/src/main/java/org/apache/comet/parquet/Utils.java
+++ b/common/src/main/java/org/apache/comet/parquet/Utils.java
@@ -19,6 +19,7 @@
 
 package org.apache.comet.parquet;
 
+import org.apache.arrow.c.CometSchemaImporter;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
@@ -28,26 +29,29 @@ public class Utils {
   public static ColumnReader getColumnReader(
       DataType type,
       ColumnDescriptor descriptor,
+      CometSchemaImporter importer,
       int batchSize,
       boolean useDecimal128,
       boolean useLazyMaterialization) {
     // TODO: support `useLegacyDateTimestamp` for Iceberg
     return getColumnReader(
-        type, descriptor, batchSize, useDecimal128, useLazyMaterialization, 
true);
+        type, descriptor, importer, batchSize, useDecimal128, 
useLazyMaterialization, true);
   }
 
   public static ColumnReader getColumnReader(
       DataType type,
       ColumnDescriptor descriptor,
+      CometSchemaImporter importer,
       int batchSize,
       boolean useDecimal128,
       boolean useLazyMaterialization,
       boolean useLegacyDateTimestamp) {
     if (useLazyMaterialization && supportLazyMaterialization(type)) {
       return new LazyColumnReader(
-          type, descriptor, batchSize, useDecimal128, useLegacyDateTimestamp);
+          type, descriptor, importer, batchSize, useDecimal128, 
useLegacyDateTimestamp);
     } else {
-      return new ColumnReader(type, descriptor, batchSize, useDecimal128, 
useLegacyDateTimestamp);
+      return new ColumnReader(
+          type, descriptor, importer, batchSize, useDecimal128, 
useLegacyDateTimestamp);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to