This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fcfe79ce3 Spark 3.2: Automatically set Arrow properties for read 
performance (#6671)
9fcfe79ce3 is described below

commit 9fcfe79ce3ef67ece696b7b2e854543990c79f19
Author: Wing Yew Poon <[email protected]>
AuthorDate: Thu Jan 26 01:05:43 2023 -0800

    Spark 3.2: Automatically set Arrow properties for read performance (#6671)
---
 spark/v3.2/build.gradle                            |  7 --
 ...dDictionaryEncodedFlatParquetDataBenchmark.java |  6 --
 .../VectorizedReadFlatParquetDataBenchmark.java    |  6 --
 .../vectorized/VectorizedSparkParquetReaders.java  | 84 ++++++++++++++++++++++
 .../iceberg/spark/source/BaseBatchReader.java      |  7 +-
 5 files changed, 85 insertions(+), 25 deletions(-)

diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle
index c7bef9928d..d59a1907d8 100644
--- a/spark/v3.2/build.gradle
+++ b/spark/v3.2/build.gradle
@@ -106,13 +106,6 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
   }
 
   tasks.withType(Test) {
-    // For vectorized reads
-    // Allow unsafe memory access to avoid the costly check arrow does to 
check if index is within bounds
-    systemProperty("arrow.enable_unsafe_memory_access", "true")
-    // Disable expensive null check for every get(index) call.
-    // Iceberg manages nullability checks itself instead of relying on arrow.
-    systemProperty("arrow.enable_null_check_for_get", "false")
-
     // Vectorized reads need more memory
     maxHeapSize '2560m'
   }
diff --git 
a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
 
b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
index 97ec026a96..54488c55f2 100644
--- 
a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
+++ 
b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
@@ -56,12 +56,6 @@ public class 
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark
   public void setupBenchmark() {
     setupSpark(true);
     appendData();
-    // Allow unsafe memory access to avoid the costly check arrow does to 
check if index is within
-    // bounds
-    System.setProperty("arrow.enable_unsafe_memory_access", "true");
-    // Disable expensive null check for every get(index) call.
-    // Iceberg manages nullability checks itself instead of relying on arrow.
-    System.setProperty("arrow.enable_null_check_for_get", "false");
   }
 
   @Override
diff --git 
a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
 
b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
index c0f2436441..e64885d2d1 100644
--- 
a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
+++ 
b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
@@ -65,12 +65,6 @@ public class VectorizedReadFlatParquetDataBenchmark extends 
IcebergSourceBenchma
   public void setupBenchmark() {
     setupSpark();
     appendData();
-    // Allow unsafe memory access to avoid the costly check arrow does to 
check if index is within
-    // bounds
-    System.setProperty("arrow.enable_unsafe_memory_access", "true");
-    // Disable expensive null check for every get(index) call.
-    // Iceberg manages nullability checks itself instead of relying on arrow.
-    System.setProperty("arrow.enable_null_check_for_get", "false");
   }
 
   @TearDown
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index bf85bdb7ed..7a84997950 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.data.vectorized;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import org.apache.arrow.vector.NullCheckingForGet;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
 import org.apache.iceberg.data.DeleteFilter;
@@ -29,16 +30,43 @@ import org.apache.iceberg.parquet.VectorizedReader;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.parquet.schema.MessageType;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class VectorizedSparkParquetReaders {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(VectorizedSparkParquetReaders.class);
+  private static final String ENABLE_UNSAFE_MEMORY_ACCESS = 
"arrow.enable_unsafe_memory_access";
+  private static final String ENABLE_UNSAFE_MEMORY_ACCESS_ENV = 
"ARROW_ENABLE_UNSAFE_MEMORY_ACCESS";
+  private static final String ENABLE_NULL_CHECK_FOR_GET = 
"arrow.enable_null_check_for_get";
+  private static final String ENABLE_NULL_CHECK_FOR_GET_ENV = 
"ARROW_ENABLE_NULL_CHECK_FOR_GET";
+
+  static {
+    try {
+      enableUnsafeMemoryAccess();
+      disableNullCheckForGet();
+    } catch (Exception e) {
+      LOG.warn("Couldn't set Arrow properties, which may impact read 
performance", e);
+    }
+  }
+
   private VectorizedSparkParquetReaders() {}
 
+  /**
+   * @deprecated will be removed in 1.3.0, use {@link #buildReader(Schema, 
MessageType, Map,
+   *     DeleteFilter)} instead.
+   */
+  @Deprecated
   public static ColumnarBatchReader buildReader(
       Schema expectedSchema, MessageType fileSchema, boolean 
setArrowValidityVector) {
     return buildReader(expectedSchema, fileSchema, setArrowValidityVector, 
Maps.newHashMap());
   }
 
+  /**
+   * @deprecated will be removed in 1.3.0, use {@link #buildReader(Schema, 
MessageType, Map,
+   *     DeleteFilter)} instead.
+   */
+  @Deprecated
   public static ColumnarBatchReader buildReader(
       Schema expectedSchema,
       MessageType fileSchema,
@@ -56,6 +84,11 @@ public class VectorizedSparkParquetReaders {
                 ColumnarBatchReader::new));
   }
 
+  /**
+   * @deprecated will be removed in 1.3.0, use {@link #buildReader(Schema, 
MessageType, Map,
+   *     DeleteFilter)} instead.
+   */
+  @Deprecated
   public static ColumnarBatchReader buildReader(
       Schema expectedSchema,
       MessageType fileSchema,
@@ -75,6 +108,57 @@ public class VectorizedSparkParquetReaders {
                 deleteFilter));
   }
 
+  public static ColumnarBatchReader buildReader(
+      Schema expectedSchema,
+      MessageType fileSchema,
+      Map<Integer, ?> idToConstant,
+      DeleteFilter<InternalRow> deleteFilter) {
+    return (ColumnarBatchReader)
+        TypeWithSchemaVisitor.visit(
+            expectedSchema.asStruct(),
+            fileSchema,
+            new ReaderBuilder(
+                expectedSchema,
+                fileSchema,
+                NullCheckingForGet.NULL_CHECKING_ENABLED,
+                idToConstant,
+                ColumnarBatchReader::new,
+                deleteFilter));
+  }
+
+  // enables unsafe memory access to avoid costly checks to see if index is 
within bounds
+  // as long as it is not configured explicitly (see BoundsChecking in Arrow)
+  private static void enableUnsafeMemoryAccess() {
+    String value = confValue(ENABLE_UNSAFE_MEMORY_ACCESS, 
ENABLE_UNSAFE_MEMORY_ACCESS_ENV);
+    if (value == null) {
+      LOG.info("Enabling {}", ENABLE_UNSAFE_MEMORY_ACCESS);
+      System.setProperty(ENABLE_UNSAFE_MEMORY_ACCESS, "true");
+    } else {
+      LOG.info("Unsafe memory access was configured explicitly: {}", value);
+    }
+  }
+
+  // disables expensive null checks for every get call in favor of Iceberg 
nullability
+  // as long as it is not configured explicitly (see NullCheckingForGet in 
Arrow)
+  private static void disableNullCheckForGet() {
+    String value = confValue(ENABLE_NULL_CHECK_FOR_GET, 
ENABLE_NULL_CHECK_FOR_GET_ENV);
+    if (value == null) {
+      LOG.info("Disabling {}", ENABLE_NULL_CHECK_FOR_GET);
+      System.setProperty(ENABLE_NULL_CHECK_FOR_GET, "false");
+    } else {
+      LOG.info("Null checking for get calls was configured explicitly: {}", 
value);
+    }
+  }
+
+  private static String confValue(String propName, String envName) {
+    String propValue = System.getProperty(propName);
+    if (propValue != null) {
+      return propValue;
+    }
+
+    return System.getenv(envName);
+  }
+
   private static class ReaderBuilder extends VectorizedReaderBuilder {
     private final DeleteFilter<InternalRow> deleteFilter;
 
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index a6f02bddcb..d206bb8e2b 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.spark.source;
 
 import java.util.Map;
 import java.util.Set;
-import org.apache.arrow.vector.NullCheckingForGet;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.ScanTask;
@@ -88,11 +87,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
         .createBatchedReaderFunc(
             fileSchema ->
                 VectorizedSparkParquetReaders.buildReader(
-                    requiredSchema,
-                    fileSchema, /* setArrowValidityVector */
-                    NullCheckingForGet.NULL_CHECKING_ENABLED,
-                    idToConstant,
-                    deleteFilter))
+                    requiredSchema, fileSchema, idToConstant, deleteFilter))
         .recordsPerBatch(batchSize)
         .filter(residual)
         .caseSensitive(caseSensitive())

Reply via email to