This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9fcfe79ce3 Spark 3.2: Automatically set Arrow properties for read
performance (#6671)
9fcfe79ce3 is described below
commit 9fcfe79ce3ef67ece696b7b2e854543990c79f19
Author: Wing Yew Poon <[email protected]>
AuthorDate: Thu Jan 26 01:05:43 2023 -0800
Spark 3.2: Automatically set Arrow properties for read performance (#6671)
---
spark/v3.2/build.gradle | 7 --
...dDictionaryEncodedFlatParquetDataBenchmark.java | 6 --
.../VectorizedReadFlatParquetDataBenchmark.java | 6 --
.../vectorized/VectorizedSparkParquetReaders.java | 84 ++++++++++++++++++++++
.../iceberg/spark/source/BaseBatchReader.java | 7 +-
5 files changed, 85 insertions(+), 25 deletions(-)
diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle
index c7bef9928d..d59a1907d8 100644
--- a/spark/v3.2/build.gradle
+++ b/spark/v3.2/build.gradle
@@ -106,13 +106,6 @@
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
}
tasks.withType(Test) {
- // For vectorized reads
- // Allow unsafe memory access to avoid the costly check arrow does to
check if index is within bounds
- systemProperty("arrow.enable_unsafe_memory_access", "true")
- // Disable expensive null check for every get(index) call.
- // Iceberg manages nullability checks itself instead of relying on arrow.
- systemProperty("arrow.enable_null_check_for_get", "false")
-
// Vectorized reads need more memory
maxHeapSize '2560m'
}
diff --git
a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
index 97ec026a96..54488c55f2 100644
---
a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
+++
b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.java
@@ -56,12 +56,6 @@ public class
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark
public void setupBenchmark() {
setupSpark(true);
appendData();
- // Allow unsafe memory access to avoid the costly check arrow does to
check if index is within
- // bounds
- System.setProperty("arrow.enable_unsafe_memory_access", "true");
- // Disable expensive null check for every get(index) call.
- // Iceberg manages nullability checks itself instead of relying on arrow.
- System.setProperty("arrow.enable_null_check_for_get", "false");
}
@Override
diff --git
a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
index c0f2436441..e64885d2d1 100644
---
a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
+++
b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/vectorized/VectorizedReadFlatParquetDataBenchmark.java
@@ -65,12 +65,6 @@ public class VectorizedReadFlatParquetDataBenchmark extends
IcebergSourceBenchma
public void setupBenchmark() {
setupSpark();
appendData();
- // Allow unsafe memory access to avoid the costly check arrow does to
check if index is within
- // bounds
- System.setProperty("arrow.enable_unsafe_memory_access", "true");
- // Disable expensive null check for every get(index) call.
- // Iceberg manages nullability checks itself instead of relying on arrow.
- System.setProperty("arrow.enable_null_check_for_get", "false");
}
@TearDown
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index bf85bdb7ed..7a84997950 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.data.vectorized;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import org.apache.arrow.vector.NullCheckingForGet;
import org.apache.iceberg.Schema;
import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
import org.apache.iceberg.data.DeleteFilter;
@@ -29,16 +30,43 @@ import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class VectorizedSparkParquetReaders {
+ private static final Logger LOG =
LoggerFactory.getLogger(VectorizedSparkParquetReaders.class);
+ private static final String ENABLE_UNSAFE_MEMORY_ACCESS =
"arrow.enable_unsafe_memory_access";
+ private static final String ENABLE_UNSAFE_MEMORY_ACCESS_ENV =
"ARROW_ENABLE_UNSAFE_MEMORY_ACCESS";
+ private static final String ENABLE_NULL_CHECK_FOR_GET =
"arrow.enable_null_check_for_get";
+ private static final String ENABLE_NULL_CHECK_FOR_GET_ENV =
"ARROW_ENABLE_NULL_CHECK_FOR_GET";
+
+ static {
+ try {
+ enableUnsafeMemoryAccess();
+ disableNullCheckForGet();
+ } catch (Exception e) {
+ LOG.warn("Couldn't set Arrow properties, which may impact read
performance", e);
+ }
+ }
+
private VectorizedSparkParquetReaders() {}
+ /**
+ * @deprecated will be removed in 1.3.0, use {@link #buildReader(Schema,
MessageType, Map,
+ * DeleteFilter)} instead.
+ */
+ @Deprecated
public static ColumnarBatchReader buildReader(
Schema expectedSchema, MessageType fileSchema, boolean
setArrowValidityVector) {
return buildReader(expectedSchema, fileSchema, setArrowValidityVector,
Maps.newHashMap());
}
+ /**
+ * @deprecated will be removed in 1.3.0, use {@link #buildReader(Schema,
MessageType, Map,
+ * DeleteFilter)} instead.
+ */
+ @Deprecated
public static ColumnarBatchReader buildReader(
Schema expectedSchema,
MessageType fileSchema,
@@ -56,6 +84,11 @@ public class VectorizedSparkParquetReaders {
ColumnarBatchReader::new));
}
+ /**
+ * @deprecated will be removed in 1.3.0, use {@link #buildReader(Schema,
MessageType, Map,
+ * DeleteFilter)} instead.
+ */
+ @Deprecated
public static ColumnarBatchReader buildReader(
Schema expectedSchema,
MessageType fileSchema,
@@ -75,6 +108,57 @@ public class VectorizedSparkParquetReaders {
deleteFilter));
}
+ public static ColumnarBatchReader buildReader(
+ Schema expectedSchema,
+ MessageType fileSchema,
+ Map<Integer, ?> idToConstant,
+ DeleteFilter<InternalRow> deleteFilter) {
+ return (ColumnarBatchReader)
+ TypeWithSchemaVisitor.visit(
+ expectedSchema.asStruct(),
+ fileSchema,
+ new ReaderBuilder(
+ expectedSchema,
+ fileSchema,
+ NullCheckingForGet.NULL_CHECKING_ENABLED,
+ idToConstant,
+ ColumnarBatchReader::new,
+ deleteFilter));
+ }
+
+ // enables unsafe memory access to avoid costly checks to see if index is
within bounds
+ // as long as it is not configured explicitly (see BoundsChecking in Arrow)
+ private static void enableUnsafeMemoryAccess() {
+ String value = confValue(ENABLE_UNSAFE_MEMORY_ACCESS,
ENABLE_UNSAFE_MEMORY_ACCESS_ENV);
+ if (value == null) {
+ LOG.info("Enabling {}", ENABLE_UNSAFE_MEMORY_ACCESS);
+ System.setProperty(ENABLE_UNSAFE_MEMORY_ACCESS, "true");
+ } else {
+ LOG.info("Unsafe memory access was configured explicitly: {}", value);
+ }
+ }
+
+ // disables expensive null checks for every get call in favor of Iceberg
nullability
+ // as long as it is not configured explicitly (see NullCheckingForGet in
Arrow)
+ private static void disableNullCheckForGet() {
+ String value = confValue(ENABLE_NULL_CHECK_FOR_GET,
ENABLE_NULL_CHECK_FOR_GET_ENV);
+ if (value == null) {
+ LOG.info("Disabling {}", ENABLE_NULL_CHECK_FOR_GET);
+ System.setProperty(ENABLE_NULL_CHECK_FOR_GET, "false");
+ } else {
+ LOG.info("Null checking for get calls was configured explicitly: {}",
value);
+ }
+ }
+
+ private static String confValue(String propName, String envName) {
+ String propValue = System.getProperty(propName);
+ if (propValue != null) {
+ return propValue;
+ }
+
+ return System.getenv(envName);
+ }
+
private static class ReaderBuilder extends VectorizedReaderBuilder {
private final DeleteFilter<InternalRow> deleteFilter;
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index a6f02bddcb..d206bb8e2b 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.spark.source;
import java.util.Map;
import java.util.Set;
-import org.apache.arrow.vector.NullCheckingForGet;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.ScanTask;
@@ -88,11 +87,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
.createBatchedReaderFunc(
fileSchema ->
VectorizedSparkParquetReaders.buildReader(
- requiredSchema,
- fileSchema, /* setArrowValidityVector */
- NullCheckingForGet.NULL_CHECKING_ENABLED,
- idToConstant,
- deleteFilter))
+ requiredSchema, fileSchema, idToConstant, deleteFilter))
.recordsPerBatch(batchSize)
.filter(residual)
.caseSensitive(caseSensitive())