This is an automated email from the ASF dual-hosted git repository.
lihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new fff24a42 [AURON-1963] Refactor SparkAuronConfiguration and remove
deprecated AuronConf classes (#1964)
fff24a42 is described below
commit fff24a4234a015652a40f593ad9524affead22a9
Author: Zhang Li <[email protected]>
AuthorDate: Thu Jan 29 15:26:44 2026 +0800
[AURON-1963] Refactor SparkAuronConfiguration and remove deprecated
AuronConf classes (#1964)
<!--
- Start the PR title with the related issue ID, e.g. '[AURON #XXXX]
Short summary...'.
-->
# Which issue does this PR close?
Closes #1963
# Rationale for this change
# What changes are included in this PR?
# Are there any user-facing changes?
# How was this patch tested?
---
.../auron/configuration/AuronConfiguration.java | 92 +--
.../apache/auron/configuration/ConfigOption.java | 109 ++-
.../apache/auron/configuration/ConfigOptions.java | 174 -----
.../main/java/org/apache/auron/jni/JniBridge.java | 34 +
.../configuration/AuronConfigurationTest.java | 54 --
.../auron/configuration/ConfigOptionTest.java | 51 --
.../configuration/MockAuronConfiguration.java | 64 +-
native-engine/auron-jni-bridge/src/conf.rs | 10 +-
native-engine/auron-jni-bridge/src/jni_bridge.rs | 95 +--
.../ForceApplyShuffledHashJoinInterceptor.java | 3 +-
.../org/apache/spark/sql/auron/ShimsImpl.scala | 14 +-
.../scala/org/apache/auron/AuronQuerySuite.scala | 6 +-
.../org/apache/auron/NativeConvertersSuite.scala | 10 +-
.../org/apache/auron/jni/SparkAuronAdaptor.java | 3 +-
.../configuration/SparkAuronConfiguration.java | 832 ++++++++++++++-------
.../SparkAuronConfigurationDocGenerator.java | 73 ++
.../java/org/apache/spark/sql/auron/AuronConf.java | 197 -----
.../java/org/apache/spark/sql/auron/JniBridge.java | 146 ----
.../spark/sql/auron/AuronCallNativeWrapper.scala | 238 ------
.../apache/spark/sql/auron/AuronConverters.scala | 76 +-
.../sql/auron/AuronSparkSessionExtension.scala | 11 +-
.../apache/spark/sql/auron/NativeConverters.scala | 38 +-
.../scala/org/apache/spark/sql/auron/Shims.scala | 1 +
.../configuration/SparkAuronConfigurationTest.java | 77 --
.../hive/auron/paimon/PaimonConvertProvider.scala | 6 +-
25 files changed, 904 insertions(+), 1510 deletions(-)
diff --git
a/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
b/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
index d6acc489..31017d8b 100644
---
a/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
+++
b/auron-core/src/main/java/org/apache/auron/configuration/AuronConfiguration.java
@@ -23,26 +23,24 @@ import java.util.Optional;
*/
public abstract class AuronConfiguration {
- public static final ConfigOption<Integer> BATCH_SIZE =
ConfigOptions.key("auron.batchSize")
- .description("Suggested batch size for arrow batches.")
- .intType()
- .defaultValue(10000);
-
- public static final ConfigOption<Double> MEMORY_FRACTION =
ConfigOptions.key("auron.memoryFraction")
- .description("Suggested fraction of off-heap memory used in native
execution. "
+ public static final ConfigOption<Integer> BATCH_SIZE = new
ConfigOption<>(Integer.class)
+ .withKey("auron.batchSize")
+ .withDescription("Suggested batch size for arrow batches.")
+ .withDefaultValue(10000);
+
+ public static final ConfigOption<Double> MEMORY_FRACTION = new
ConfigOption<>(Double.class)
+ .withKey("auron.memoryFraction")
+ .withDescription("Suggested fraction of off-heap memory used in
native execution. "
+ "actual off-heap memory usage is expected to be
spark.executor.memoryOverhead * fraction.")
- .doubleType()
- .defaultValue(0.6);
+ .withDefaultValue(0.6);
- public static final ConfigOption<String> NATIVE_LOG_LEVEL =
ConfigOptions.key("auron.native.log.level")
- .description("Log level for native execution.")
- .stringType()
- .defaultValue("info");
+ public static final ConfigOption<String> NATIVE_LOG_LEVEL = new
ConfigOption<>(String.class)
+ .withKey("auron.native.log.level")
+ .withDescription("Log level for native execution.")
+ .withDefaultValue("info");
public abstract <T> Optional<T> getOptional(ConfigOption<T> option);
- public abstract <T> Optional<T> getOptional(String key);
-
public <T> T get(ConfigOption<T> option) {
return getOptional(option).orElseGet(() ->
getOptionDefaultValue(option));
}
@@ -57,18 +55,6 @@ public abstract class AuronConfiguration {
return getOptional(configOption).orElseGet(() ->
getOptionDefaultValue(configOption));
}
- /**
- * Returns the value associated with the given config option as a string.
If no value is mapped
- * under any key of the option, it returns the specified default instead
of the option's default
- * value.
- *
- * @param configOption The configuration option
- * @return the (default) value associated with the given config option
- */
- public String getString(ConfigOption<String> configOption, String
overrideDefault) {
- return getOptional(configOption).orElse(overrideDefault);
- }
-
/**
* Returns the value associated with the given config option as an integer.
*
@@ -79,19 +65,6 @@ public abstract class AuronConfiguration {
return getOptional(configOption).orElseGet(() ->
getOptionDefaultValue(configOption));
}
- /**
- * Returns the value associated with the given config option as an
integer. If no value is
- * mapped under any key of the option, it returns the specified default
instead of the option's
- * default value.
- *
- * @param configOption The configuration option
- * @param overrideDefault The value to return if no value was mapped for
any key of the option
- * @return the configured value associated with the given config option,
or the overrideDefault
- */
- public int getInteger(ConfigOption<Integer> configOption, int
overrideDefault) {
- return getOptional(configOption).orElse(overrideDefault);
- }
-
/**
* Returns the value associated with the given config option as a long
integer.
*
@@ -102,19 +75,6 @@ public abstract class AuronConfiguration {
return getOptional(configOption).orElseGet(() ->
getOptionDefaultValue(configOption));
}
- /**
- * Returns the value associated with the given config option as a long
integer. If no value is
- * mapped under any key of the option, it returns the specified default
instead of the option's
- * default value.
- *
- * @param configOption The configuration option
- * @param overrideDefault The value to return if no value was mapped for
any key of the option
- * @return the configured value associated with the given config option,
or the overrideDefault
- */
- public long getLong(ConfigOption<Long> configOption, long overrideDefault)
{
- return getOptional(configOption).orElse(overrideDefault);
- }
-
/**
* Returns the value associated with the given config option as a boolean.
*
@@ -148,19 +108,6 @@ public abstract class AuronConfiguration {
return getOptional(configOption).orElseGet(() ->
getOptionDefaultValue(configOption));
}
- /**
- * Returns the value associated with the given config option as a float.
If no value is mapped
- * under any key of the option, it returns the specified default instead
of the option's default
- * value.
- *
- * @param configOption The configuration option
- * @param overrideDefault The value to return if no value was mapped for
any key of the option
- * @return the configured value associated with the given config option,
or the overrideDefault
- */
- public float getFloat(ConfigOption<Float> configOption, float
overrideDefault) {
- return getOptional(configOption).orElse(overrideDefault);
- }
-
/**
* Returns the value associated with the given config option as a {@code
double}.
*
@@ -171,19 +118,6 @@ public abstract class AuronConfiguration {
return getOptional(configOption).orElseGet(() ->
getOptionDefaultValue(configOption));
}
- /**
- * Returns the value associated with the given config option as a {@code
double}. If no value is
- * mapped under any key of the option, it returns the specified default
instead of the option's
- * default value.
- *
- * @param configOption The configuration option
- * @param overrideDefault The value to return if no value was mapped for
any key of the option
- * @return the configured value associated with the given config option,
or the overrideDefault
- */
- public double getDouble(ConfigOption<Double> configOption, double
overrideDefault) {
- return getOptional(configOption).orElse(overrideDefault);
- }
-
/**
* Returns the value associated with the given config option as a {@code
double}.
*
diff --git
a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java
b/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java
index 925f3ddd..04a4e0f2 100644
--- a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java
+++ b/auron-core/src/main/java/org/apache/auron/configuration/ConfigOption.java
@@ -16,9 +16,10 @@
*/
package org.apache.auron.configuration;
-import static org.apache.auron.util.Preconditions.checkNotNull;
-
+import java.util.ArrayList;
+import java.util.List;
import java.util.function.Function;
+import org.apache.auron.jni.AuronAdaptor;
/**
* A {@code ConfigOption} describes a configuration parameter. It encapsulates
the configuration
@@ -35,16 +36,22 @@ public class ConfigOption<T> {
public static final String EMPTY_DESCRIPTION = "";
/** The current key for that config option. */
- private final String key;
+ private String key;
+
+ /** The current key for that config option. */
+ private List<String> altKeys = new ArrayList<>();
/** The default value for this option. */
- private final T defaultValue;
+ private T defaultValue;
+
+ /** The current category for that config option. */
+ private String category = "Uncategorized";
/** The description for this option. */
- private final String description;
+ private String description;
/** The function to compute the default value. */
- private final Function<AuronConfiguration, T> dynamicDefaultValueFunction;
+ private Function<AuronConfiguration, T> dynamicDefaultValueFunction;
/**
* Type of the value that this ConfigOption describes.
@@ -55,27 +62,40 @@ public class ConfigOption<T> {
* <li>typeClass == atomic class and isList == true for {@code
ConfigOption<List<Integer>>}
* </ul>
*/
- private final Class<?> clazz;
+ private Class<T> clazz;
- /**
- * Creates a new config option with fallback keys.
- *
- * @param key The current key for that config option
- * @param clazz describes type of the ConfigOption, see description of the
clazz field
- * @param description Description for that option
- * @param defaultValue The default value for this option
- */
- ConfigOption(
- String key,
- Class<?> clazz,
- T defaultValue,
- String description,
- Function<AuronConfiguration, T> dynamicDefaultValueFunction) {
- this.key = checkNotNull(key);
- this.description = description;
+ public ConfigOption(Class<T> clazz) {
+ this.clazz = clazz;
+ }
+
+ public ConfigOption<T> withKey(String key) {
+ this.key = key;
+ return this;
+ }
+
+ public ConfigOption<T> addAltKey(String altKey) {
+ this.altKeys.add(altKey);
+ return this;
+ }
+
+ public ConfigOption<T> withDefaultValue(T defaultValue) {
this.defaultValue = defaultValue;
- this.clazz = checkNotNull(clazz);
+ return this;
+ }
+
+ public ConfigOption<T> withCategory(String category) {
+ this.category = category;
+ return this;
+ }
+
+ public ConfigOption<T> withDescription(String description) {
+ this.description = description;
+ return this;
+ }
+
+ public ConfigOption<T>
withDynamicDefaultValue(Function<AuronConfiguration, T>
dynamicDefaultValueFunction) {
this.dynamicDefaultValueFunction = dynamicDefaultValueFunction;
+ return this;
}
/**
@@ -88,21 +108,24 @@ public class ConfigOption<T> {
}
/**
- * Gets the description of configuration key
- *
- * @return
+ * Gets the alternative configuration keys.
*/
- public String description() {
- return description;
+ public List<String> altKeys() {
+ return altKeys;
}
/**
- * Checks if this option has a default value.
- *
- * @return True if it has a default value, false if not.
+ * Gets the category of configuration key
*/
- public boolean hasDefaultValue() {
- return defaultValue != null;
+ public String category() {
+ return category;
+ }
+
+ /**
+ * Gets the description of configuration key
+ */
+ public String description() {
+ return description;
}
/**
@@ -131,4 +154,22 @@ public class ConfigOption<T> {
public Function<AuronConfiguration, T> dynamicDefaultValueFunction() {
return dynamicDefaultValueFunction;
}
+
+ /**
+ * Gets the type class of the value that this ConfigOption describes.
+ *
+ * @return The type class of the value that this ConfigOption describes.
+ */
+ public Class<T> getValueClass() {
+ return clazz;
+ }
+
+ /**
+ * Retrieves the current value of this configuration option.
+ *
+ * @return the current value associated with this configuration option.
+ */
+ public T get() {
+ return AuronAdaptor.getInstance().getAuronConfiguration().get(this);
+ }
}
diff --git
a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOptions.java
b/auron-core/src/main/java/org/apache/auron/configuration/ConfigOptions.java
deleted file mode 100644
index 47efc98e..00000000
--- a/auron-core/src/main/java/org/apache/auron/configuration/ConfigOptions.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.auron.configuration;
-
-import static org.apache.auron.util.Preconditions.checkNotNull;
-
-import java.util.function.Function;
-
-/**
- * Refer to the design of the Flink engine.
- * {@code ConfigOptions} are used to build a {@link ConfigOption}. The option
is typically built in
- * one of the following pattern:
- *
- * <pre>{@code
- * // simple string-valued option with a default value
- * ConfigOption<String> tempDirs = ConfigOptions
- * .key("tmp.dir")
- * .stringType()
- * .defaultValue("/tmp");
- *
- * // simple string-valued option with a default value and with the description
- * ConfigOption<String> tempDirs = ConfigOptions
- * .key("tmp.dir")
- * .description("this is a example of string")
- * .stringType()
- * .defaultValue("/tmp");
- *
- * // simple integer-valued option with a default value
- * ConfigOption<Integer> batchSize = ConfigOptions
- * .key("batch.size")
- * .intType()
- * .defaultValue(100);
- *
- * // option with no default value
- * ConfigOption<String> userName = ConfigOptions
- * .key("user.name")
- * .stringType()
- * .noDefaultValue();
- * }</pre>
- */
-public class ConfigOptions {
-
- /**
- * Starts building a new {@link ConfigOption}.
- *
- * @param key The key for the config option.
- * @return The builder for the config option with the given key.
- */
- public static OptionBuilder key(String key) {
- checkNotNull(key);
- return new OptionBuilder(key);
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * The option builder is used to create a {@link ConfigOption}. It is
instantiated via {@link
- * ConfigOptions#key(String)}.
- */
- public static final class OptionBuilder {
-
- /** The key for the config option. */
- private final String key;
-
- private String description = ConfigOption.EMPTY_DESCRIPTION;
-
- /**
- * Creates a new OptionBuilder.
- * @param key The key for the config option
- */
- OptionBuilder(String key) {
- this.key = key;
- }
-
- OptionBuilder(String key, String description) {
- this.key = key;
- this.description = description;
- }
-
- public OptionBuilder description(String description) {
- return new OptionBuilder(key, description);
- }
-
- /** Defines that the value of the option should be of {@link Boolean}
type. */
- public TypedConfigOptionBuilder<Boolean> booleanType() {
- return new TypedConfigOptionBuilder<>(key, Boolean.class,
description);
- }
-
- /** Defines that the value of the option should be of {@link Integer}
type. */
- public TypedConfigOptionBuilder<Integer> intType() {
- return new TypedConfigOptionBuilder<>(key, Integer.class,
description);
- }
-
- /** Defines that the value of the option should be of {@link Long}
type. */
- public TypedConfigOptionBuilder<Long> longType() {
- return new TypedConfigOptionBuilder<>(key, Long.class,
description);
- }
-
- /** Defines that the value of the option should be of {@link Float}
type. */
- public TypedConfigOptionBuilder<Float> floatType() {
- return new TypedConfigOptionBuilder<>(key, Float.class,
description);
- }
-
- /** Defines that the value of the option should be of {@link Double}
type. */
- public TypedConfigOptionBuilder<Double> doubleType() {
- return new TypedConfigOptionBuilder<>(key, Double.class,
description);
- }
-
- /** Defines that the value of the option should be of {@link String}
type. */
- public TypedConfigOptionBuilder<String> stringType() {
- return new TypedConfigOptionBuilder<>(key, String.class,
description);
- }
- }
-
- /**
- * Builder for {@link ConfigOption} with a defined atomic type.
- *
- * @param <T> atomic type of the option
- */
- public static class TypedConfigOptionBuilder<T> {
- private final String key;
- private final Class<T> clazz;
-
- private final String description;
-
- TypedConfigOptionBuilder(String key, Class<T> clazz, String
description) {
- this.key = key;
- this.clazz = clazz;
- this.description = description;
- }
-
- /**
- * Creates a ConfigOption with the given default value.
- *
- * @param value The default value for the config option
- * @return The config option with the default value.
- */
- public ConfigOption<T> defaultValue(T value) {
- return new ConfigOption<>(key, clazz, value, description, null);
- }
-
- /**
- * Creates a ConfigOption without a default value.
- *
- * @return The config option without a default value.
- */
- public ConfigOption<T> noDefaultValue() {
- return new ConfigOption<>(key, clazz, null, description, null);
- }
-
- public ConfigOption<T>
dynamicDefaultValue(Function<AuronConfiguration, T>
dynamicDefaultValueFunction) {
- return new ConfigOption<>(key, clazz, null, description,
dynamicDefaultValueFunction);
- }
- }
-
- // ------------------------------------------------------------------------
-
- /** Not intended to be instantiated. */
- private ConfigOptions() {}
-}
diff --git a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
index 4d7edbf8..121970c2 100644
--- a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
+++ b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java
@@ -23,10 +23,13 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.auron.configuration.AuronConfiguration;
+import org.apache.auron.configuration.ConfigOption;
import org.apache.auron.functions.AuronUDFWrapperContext;
import org.apache.auron.hadoop.fs.FSDataInputWrapper;
import org.apache.auron.hadoop.fs.FSDataOutputWrapper;
import org.apache.auron.memory.OnHeapSpillManager;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -106,4 +109,35 @@ public class JniBridge {
public static AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer
udfSerialized) {
return
AuronAdaptor.getInstance().getAuronUDFWrapperContext(udfSerialized);
}
+
+ public static int intConf(String confKey) {
+ return getConfValue(confKey);
+ }
+
+ public static long longConf(String confKey) {
+ return getConfValue(confKey);
+ }
+
+ public static double doubleConf(String confKey) {
+ return getConfValue(confKey);
+ }
+
+ public static boolean booleanConf(String confKey) {
+ return getConfValue(confKey);
+ }
+
+ public static String stringConf(String confKey) {
+ return getConfValue(confKey);
+ }
+
+ static <T> T getConfValue(String confKey) {
+ Class<? extends AuronConfiguration> confClass =
+ AuronAdaptor.getInstance().getAuronConfiguration().getClass();
+ try {
+ ConfigOption<T> configOption = (ConfigOption<T>)
FieldUtils.readStaticField(confClass, confKey);
+ return configOption.get();
+ } catch (IllegalAccessException | ClassCastException e) {
+ throw new RuntimeException("error reading conf value: " + confKey,
e);
+ }
+ }
}
diff --git
a/auron-core/src/test/java/org/apache/auron/configuration/AuronConfigurationTest.java
b/auron-core/src/test/java/org/apache/auron/configuration/AuronConfigurationTest.java
deleted file mode 100644
index afee2f59..00000000
---
a/auron-core/src/test/java/org/apache/auron/configuration/AuronConfigurationTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.auron.configuration;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/**
- * This is a test class for {@link AuronConfiguration}.
- */
-public class AuronConfigurationTest {
-
- private MockAuronConfiguration config;
-
- @BeforeEach
- public void setUp() {
- config = new MockAuronConfiguration();
-
config.addConfig(MockAuronConfiguration.STRING_WITHOUT_DEFAULT_CONFIG_OPTION.key(),
"str1");
- config.addConfig(MockAuronConfiguration.INT_CONFIG_OPTION.key(), 100);
- config.addConfig(MockAuronConfiguration.BOOLEAN_CONFIG_OPTION.key(),
false);
- config.addConfig(MockAuronConfiguration.DOUBLE_CONFIG_OPTION.key(),
99.9);
- config.addConfig(MockAuronConfiguration.LONG_CONFIG_OPTION.key(),
10000000000L);
- config.addConfig(MockAuronConfiguration.FLOAT_CONFIG_OPTION.key(),
1.2f);
- }
-
- @Test
- public void testGetConfig() {
- assertEquals("str1",
config.get(MockAuronConfiguration.STRING_WITHOUT_DEFAULT_CONFIG_OPTION));
- assertEquals("zm",
config.get(MockAuronConfiguration.STRING_CONFIG_OPTION));
- assertEquals(100,
config.getInteger(MockAuronConfiguration.INT_CONFIG_OPTION));
- assertEquals(false,
config.get(MockAuronConfiguration.BOOLEAN_CONFIG_OPTION));
- assertEquals(99.9,
config.get(MockAuronConfiguration.DOUBLE_CONFIG_OPTION), 0.0000000001);
- assertEquals(10000000000L,
config.getLong(MockAuronConfiguration.LONG_CONFIG_OPTION));
- assertEquals(1.2f,
config.get(MockAuronConfiguration.FLOAT_CONFIG_OPTION), 0.0000000001);
- // test dynamic default value
- assertEquals(500,
config.getInteger(MockAuronConfiguration.INT_WITH_DYNAMIC_DEFAULT_CONFIG_OPTION));
- }
-}
diff --git
a/auron-core/src/test/java/org/apache/auron/configuration/ConfigOptionTest.java
b/auron-core/src/test/java/org/apache/auron/configuration/ConfigOptionTest.java
deleted file mode 100644
index 0242974a..00000000
---
a/auron-core/src/test/java/org/apache/auron/configuration/ConfigOptionTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.auron.configuration;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import org.junit.jupiter.api.Test;
-
-/** Tests for the {@link ConfigOption}. */
-public class ConfigOptionTest {
-
- @Test
- public void testConfigOption() {
- ConfigOption<String> keyOption =
ConfigOptions.key("key").stringType().noDefaultValue();
- assertEquals("key", keyOption.key());
- assertEquals(null, keyOption.defaultValue());
- assertEquals(false, keyOption.hasDefaultValue());
- ConfigOption<Boolean> booleanOption =
- ConfigOptions.key("boolean").booleanType().defaultValue(true);
- assertEquals(true, booleanOption.defaultValue());
- }
-
- @Test
- public void testConfigOptionAddDesc() {
- ConfigOption<String> keyOption = ConfigOptions.key("key")
- .description("this is a description of the key")
- .stringType()
- .noDefaultValue();
- assertEquals("key", keyOption.key());
- assertEquals(null, keyOption.defaultValue());
- assertEquals(false, keyOption.hasDefaultValue());
- ConfigOption<Boolean> booleanOption =
- ConfigOptions.key("boolean").booleanType().defaultValue(true);
- assertEquals(true, booleanOption.defaultValue());
- assertEquals("this is a description of the key",
keyOption.description());
- }
-}
diff --git
a/auron-core/src/test/java/org/apache/auron/configuration/MockAuronConfiguration.java
b/auron-core/src/test/java/org/apache/auron/configuration/MockAuronConfiguration.java
index eb14c38c..ec6098cc 100644
---
a/auron-core/src/test/java/org/apache/auron/configuration/MockAuronConfiguration.java
+++
b/auron-core/src/test/java/org/apache/auron/configuration/MockAuronConfiguration.java
@@ -16,53 +16,51 @@
*/
package org.apache.auron.configuration;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Optional;
public class MockAuronConfiguration extends AuronConfiguration {
- public static final ConfigOption<String> STRING_CONFIG_OPTION =
- ConfigOptions.key("string").stringType().defaultValue("zm");
+ // Basic configuration options with descriptions
+ public static final ConfigOption<String> STRING_CONFIG_OPTION = new
ConfigOption<>(String.class)
+ .withKey("string")
+ .withDescription("A string configuration option for testing.")
+ .withDefaultValue("zm");
- public static final ConfigOption<String>
STRING_WITHOUT_DEFAULT_CONFIG_OPTION =
-
ConfigOptions.key("string_without_default").stringType().noDefaultValue();
+ public static final ConfigOption<Integer> INT_CONFIG_OPTION = new
ConfigOption<>(Integer.class)
+ .withKey("int")
+ .withDescription("An integer configuration option for testing.")
+ .withDefaultValue(1);
- public static final ConfigOption<Integer> INT_CONFIG_OPTION =
- ConfigOptions.key("int").intType().defaultValue(1);
+ public static final ConfigOption<Long> LONG_CONFIG_OPTION = new
ConfigOption<>(Long.class)
+ .withKey("long")
+ .withDescription("A long configuration option for testing.")
+ .withDefaultValue(1L);
- public static final ConfigOption<Long> LONG_CONFIG_OPTION =
- ConfigOptions.key("long").longType().defaultValue(1L);
+ public static final ConfigOption<Boolean> BOOLEAN_CONFIG_OPTION = new
ConfigOption<>(Boolean.class)
+ .withKey("boolean")
+ .withDescription("A boolean configuration option for testing.")
+ .withDefaultValue(true);
- public static final ConfigOption<Boolean> BOOLEAN_CONFIG_OPTION =
- ConfigOptions.key("boolean").booleanType().defaultValue(true);
+ public static final ConfigOption<Double> DOUBLE_CONFIG_OPTION = new
ConfigOption<>(Double.class)
+ .withKey("double")
+ .withDescription("A double configuration option for testing.")
+ .withDefaultValue(1.0);
- public static final ConfigOption<Double> DOUBLE_CONFIG_OPTION =
- ConfigOptions.key("double").doubleType().defaultValue(1.0);
+ public static final ConfigOption<Float> FLOAT_CONFIG_OPTION = new
ConfigOption<>(Float.class)
+ .withKey("float")
+ .withDescription("A float configuration option for testing.")
+ .withDefaultValue(1.0f);
- public static final ConfigOption<Float> FLOAT_CONFIG_OPTION =
- ConfigOptions.key("float").floatType().defaultValue(1.0f);
-
- public static final ConfigOption<Integer>
INT_WITH_DYNAMIC_DEFAULT_CONFIG_OPTION = ConfigOptions.key(
- "int_with_dynamic_default")
- .intType()
- .dynamicDefaultValue(config ->
config.getInteger(INT_CONFIG_OPTION) * 5);
-
- private Map<String, Object> configMap = new HashMap<>();
+ public static final ConfigOption<Integer>
INT_WITH_DYNAMIC_DEFAULT_CONFIG_OPTION = new ConfigOption<>(Integer.class)
+ .withKey("int_with_dynamic_default")
+ .withDescription("An integer configuration option with dynamic
default value.")
+ .withDynamicDefaultValue(
+ config -> config.getOptional(INT_CONFIG_OPTION).orElse(1)
* 5);
public MockAuronConfiguration() {}
- public void addConfig(String key, Object value) {
- configMap.put(key, value);
- }
-
@Override
public <T> Optional<T> getOptional(ConfigOption<T> option) {
- return Optional.ofNullable((T) configMap.getOrDefault(option.key(),
getOptionDefaultValue(option)));
- }
-
- @Override
- public <T> Optional<T> getOptional(String key) {
- return Optional.ofNullable((T) configMap.get(key));
+ return Optional.empty(); // always use default value
}
}
diff --git a/native-engine/auron-jni-bridge/src/conf.rs
b/native-engine/auron-jni-bridge/src/conf.rs
index 383596d6..351432f9 100644
--- a/native-engine/auron-jni-bridge/src/conf.rs
+++ b/native-engine/auron-jni-bridge/src/conf.rs
@@ -67,7 +67,7 @@ pub trait BooleanConf {
fn value(&self) -> Result<bool> {
ensure_jni_bridge_inited()?;
let key = jni_new_string!(self.key())?;
- jni_call_static!(AuronConf.booleanConf(key.as_obj()) -> bool)
+ jni_call_static!(JniBridge.booleanConf(key.as_obj()) -> bool)
}
}
@@ -76,7 +76,7 @@ pub trait IntConf {
fn value(&self) -> Result<i32> {
ensure_jni_bridge_inited()?;
let key = jni_new_string!(self.key())?;
- jni_call_static!(AuronConf.intConf(key.as_obj()) -> i32)
+ jni_call_static!(JniBridge.intConf(key.as_obj()) -> i32)
}
}
@@ -85,7 +85,7 @@ pub trait LongConf {
fn value(&self) -> Result<i64> {
ensure_jni_bridge_inited()?;
let key = jni_new_string!(self.key())?;
- jni_call_static!(AuronConf.longConf(key.as_obj()) -> i64)
+ jni_call_static!(JniBridge.longConf(key.as_obj()) -> i64)
}
}
@@ -94,7 +94,7 @@ pub trait DoubleConf {
fn value(&self) -> Result<f64> {
ensure_jni_bridge_inited()?;
let key = jni_new_string!(self.key())?;
- jni_call_static!(AuronConf.doubleConf(key.as_obj()) -> f64)
+ jni_call_static!(JniBridge.doubleConf(key.as_obj()) -> f64)
}
}
@@ -104,7 +104,7 @@ pub trait StringConf {
ensure_jni_bridge_inited()?;
let key = jni_new_string!(self.key())?;
let value = jni_get_string!(
- jni_call_static!(AuronConf.stringConf(key.as_obj()) -> JObject)?
+ jni_call_static!(JniBridge.stringConf(key.as_obj()) -> JObject)?
.as_obj()
.into()
)?;
diff --git a/native-engine/auron-jni-bridge/src/jni_bridge.rs
b/native-engine/auron-jni-bridge/src/jni_bridge.rs
index 5b76d829..f058901a 100644
--- a/native-engine/auron-jni-bridge/src/jni_bridge.rs
+++ b/native-engine/auron-jni-bridge/src/jni_bridge.rs
@@ -447,7 +447,6 @@ pub struct JavaClasses<'a> {
pub cSparkUDAFWrapperContext: SparkUDAFWrapperContext<'a>,
pub cSparkUDTFWrapperContext: SparkUDTFWrapperContext<'a>,
pub cSparkUDAFMemTracker: SparkUDAFMemTracker<'a>,
- pub cAuronConf: AuronConf<'a>,
pub cAuronRssPartitionWriterBase: AuronRssPartitionWriterBase<'a>,
pub cAuronCallNativeWrapper: AuronCallNativeWrapper<'a>,
pub cAuronOnHeapSpillManager: AuronOnHeapSpillManager<'a>,
@@ -513,7 +512,6 @@ impl JavaClasses<'static> {
cSparkUDAFWrapperContext: SparkUDAFWrapperContext::new(env)?,
cSparkUDTFWrapperContext: SparkUDTFWrapperContext::new(env)?,
cSparkUDAFMemTracker: SparkUDAFMemTracker::new(env)?,
- cAuronConf: AuronConf::new(env)?,
cAuronRssPartitionWriterBase:
AuronRssPartitionWriterBase::new(env)?,
cAuronCallNativeWrapper: AuronCallNativeWrapper::new(env)?,
cAuronOnHeapSpillManager: AuronOnHeapSpillManager::new(env)?,
@@ -577,9 +575,18 @@ pub struct JniBridge<'a> {
pub method_getTotalMemoryLimited_ret: ReturnType,
pub method_getDirectWriteSpillToDiskFile: JStaticMethodID,
pub method_getDirectWriteSpillToDiskFile_ret: ReturnType,
-
pub method_getAuronUDFWrapperContext: JStaticMethodID,
pub method_getAuronUDFWrapperContext_ret: ReturnType,
+ pub method_intConf: JStaticMethodID,
+ pub method_intConf_ret: ReturnType,
+ pub method_longConf: JStaticMethodID,
+ pub method_longConf_ret: ReturnType,
+ pub method_doubleConf: JStaticMethodID,
+ pub method_doubleConf_ret: ReturnType,
+ pub method_booleanConf: JStaticMethodID,
+ pub method_booleanConf_ret: ReturnType,
+ pub method_stringConf: JStaticMethodID,
+ pub method_stringConf_ret: ReturnType,
}
impl<'a> JniBridge<'a> {
pub const SIG_TYPE: &'static str = "org/apache/auron/jni/JniBridge";
@@ -663,6 +670,36 @@ impl<'a> JniBridge<'a> {
"(Ljava/nio/ByteBuffer;)Lorg/apache/auron/functions/AuronUDFWrapperContext;",
)?,
method_getAuronUDFWrapperContext_ret: ReturnType::Object,
+ method_intConf: env.get_static_method_id(
+ class,
+ "intConf",
+ "(Ljava/lang/String;)I",
+ )?,
+ method_intConf_ret: ReturnType::Primitive(Primitive::Int),
+ method_longConf: env.get_static_method_id(
+ class,
+ "longConf",
+ "(Ljava/lang/String;)J",
+ )?,
+ method_longConf_ret: ReturnType::Primitive(Primitive::Long),
+ method_doubleConf: env.get_static_method_id(
+ class,
+ "doubleConf",
+ "(Ljava/lang/String;)D",
+ )?,
+ method_doubleConf_ret: ReturnType::Primitive(Primitive::Double),
+ method_booleanConf: env.get_static_method_id(
+ class,
+ "booleanConf",
+ "(Ljava/lang/String;)Z",
+ )?,
+ method_booleanConf_ret: ReturnType::Primitive(Primitive::Boolean),
+ method_stringConf: env.get_static_method_id(
+ class,
+ "stringConf",
+ "(Ljava/lang/String;)Ljava/lang/String;",
+ )?,
+ method_stringConf_ret: ReturnType::Object,
})
}
}
@@ -1121,58 +1158,6 @@ impl<'a> SparkMetricNode<'a> {
}
}
-#[allow(non_snake_case)]
-pub struct AuronConf<'a> {
- pub class: JClass<'a>,
- pub method_booleanConf: JStaticMethodID,
- pub method_booleanConf_ret: ReturnType,
- pub method_intConf: JStaticMethodID,
- pub method_intConf_ret: ReturnType,
- pub method_longConf: JStaticMethodID,
- pub method_longConf_ret: ReturnType,
- pub method_doubleConf: JStaticMethodID,
- pub method_doubleConf_ret: ReturnType,
- pub method_stringConf: JStaticMethodID,
- pub method_stringConf_ret: ReturnType,
-}
-
-impl<'a> AuronConf<'_> {
- pub const SIG_TYPE: &'static str = "org/apache/spark/sql/auron/AuronConf";
-
- pub fn new(env: &JNIEnv<'a>) -> JniResult<AuronConf<'a>> {
- let class = get_global_jclass(env, Self::SIG_TYPE)?;
- Ok(AuronConf {
- class,
- method_booleanConf: env.get_static_method_id(
- class,
- "booleanConf",
- "(Ljava/lang/String;)Z",
- )?,
- method_booleanConf_ret: ReturnType::Primitive(Primitive::Boolean),
- method_intConf: env.get_static_method_id(class, "intConf",
"(Ljava/lang/String;)I")?,
- method_intConf_ret: ReturnType::Primitive(Primitive::Int),
- method_longConf: env.get_static_method_id(
- class,
- "longConf",
- "(Ljava/lang/String;)J",
- )?,
- method_longConf_ret: ReturnType::Primitive(Primitive::Long),
- method_doubleConf: env.get_static_method_id(
- class,
- "doubleConf",
- "(Ljava/lang/String;)D",
- )?,
- method_doubleConf_ret: ReturnType::Primitive(Primitive::Double),
- method_stringConf: env.get_static_method_id(
- class,
- "stringConf",
- "(Ljava/lang/String;)Ljava/lang/String;",
- )?,
- method_stringConf_ret: ReturnType::Object,
- })
- }
-}
-
#[allow(non_snake_case)]
pub struct AuronRssPartitionWriterBase<'a> {
pub class: JClass<'a>,
diff --git
a/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java
b/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java
index 8d7d0dd5..3cc6cbd1 100644
---
a/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java
+++
b/spark-extension-shims-spark/src/main/java/org/apache/spark/sql/auron/ForceApplyShuffledHashJoinInterceptor.java
@@ -18,6 +18,7 @@ package org.apache.spark.sql.auron;
import net.bytebuddy.implementation.bind.annotation.Argument;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
+import org.apache.auron.spark.configuration.SparkAuronConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +28,6 @@ public class ForceApplyShuffledHashJoinInterceptor {
@RuntimeType
public static Object intercept(@Argument(0) Object conf) {
logger.debug("calling JoinSelectionHelper.forceApplyShuffledHashJoin()
intercepted by auron");
- return AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf();
+ return SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.get();
}
}
diff --git
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
index c3a1861d..6353d7cb 100644
---
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
+++
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
@@ -107,6 +107,7 @@ import org.apache.spark.storage.FileSegment
import org.apache.auron.{protobuf => pb, sparkver}
import org.apache.auron.common.AuronBuildInfo
import org.apache.auron.metric.SparkMetricNode
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
import org.apache.auron.spark.ui.AuronBuildInfoEvent
class ShimsImpl extends Shims with Logging {
@@ -128,7 +129,7 @@ class ShimsImpl extends Shims with Logging {
override def initExtension(): Unit = {
ValidateSparkPlanInjector.inject()
- if (AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()) {
+ if (SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.get()) {
ForceApplyShuffledHashJoinInjector.inject()
}
@@ -141,19 +142,18 @@ class ShimsImpl extends Shims with Logging {
@sparkver("3.0 / 3.1")
override def initExtension(): Unit = {
- if (AuronConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()) {
- logWarning(s"${AuronConf.FORCE_SHUFFLED_HASH_JOIN.key} is not supported
in $shimVersion")
+ if (SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.get()) {
+ logWarning(
+ s"${SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN.key} is not
supported in $shimVersion")
}
}
// set Auron spark ui if spark.auron.ui.enabled is true
override def onApplyingExtension(): Unit = {
- logInfo(
- " onApplyingExtension get ui_enabled : " + SparkEnv.get.conf
- .get(AuronConf.UI_ENABLED.key, "true"))
+ logInfo(s"onApplyingExtension get ui_enabled:
${SparkAuronConfiguration.UI_ENABLED.get()}")
- if (SparkEnv.get.conf.get(AuronConf.UI_ENABLED.key,
"true").equals("true")) {
+ if (SparkAuronConfiguration.UI_ENABLED.get()) {
val sparkContext = SparkContext.getActive.getOrElse {
throw new IllegalStateException("No active spark context found that
should not happen")
}
diff --git
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala
index 3e990978..e82eb78f 100644
---
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala
+++
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala
@@ -17,9 +17,9 @@
package org.apache.auron
import org.apache.spark.sql.{AuronQueryTest, Row}
-import org.apache.spark.sql.auron.AuronConf
import org.apache.spark.sql.execution.joins.auron.plan.NativeBroadcastJoinExec
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
import org.apache.auron.util.AuronTestUtils
class AuronQuerySuite extends AuronQueryTest with BaseAuronSQLSuite with
AuronSQLTestHelper {
@@ -197,7 +197,7 @@ class AuronQuerySuite extends AuronQueryTest with
BaseAuronSQLSuite with AuronSQ
if (AuronTestUtils.isSparkV32OrGreater) {
Seq(true, false).foreach { forcePositionalEvolution =>
withEnvConf(
- AuronConf.ORC_FORCE_POSITIONAL_EVOLUTION.key ->
forcePositionalEvolution.toString) {
+ SparkAuronConfiguration.ORC_FORCE_POSITIONAL_EVOLUTION.key ->
forcePositionalEvolution.toString) {
withTempPath { f =>
val path = f.getCanonicalPath
Seq[(Integer, Integer)]((1, 2), (3, 4), (5, 6), (null, null))
@@ -220,7 +220,7 @@ class AuronQuerySuite extends AuronQueryTest with
BaseAuronSQLSuite with AuronSQ
if (AuronTestUtils.isSparkV32OrGreater) {
Seq(true, false).foreach { forcePositionalEvolution =>
withEnvConf(
- AuronConf.ORC_FORCE_POSITIONAL_EVOLUTION.key ->
forcePositionalEvolution.toString) {
+ SparkAuronConfiguration.ORC_FORCE_POSITIONAL_EVOLUTION.key ->
forcePositionalEvolution.toString) {
withTempPath { f =>
val path = f.getCanonicalPath
Seq[(Integer, Integer, Integer)]((1, 2, 1), (3, 4, 2), (5, 6, 3),
(null, null, 4))
diff --git
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala
index dc96731c..8bc93190 100644
---
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala
+++
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/NativeConvertersSuite.scala
@@ -17,7 +17,7 @@
package org.apache.auron
import org.apache.spark.sql.AuronQueryTest
-import org.apache.spark.sql.auron.{AuronConf, NativeConverters}
+import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType,
StringType}
@@ -51,25 +51,25 @@ class NativeConvertersSuite
}
test("cast from string to numeric adds trim wrapper before native cast when
enabled") {
- withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "true") {
+ withSQLConf("spark.auron.cast.trimString" -> "true") {
assertTrimmedCast(" 42 ", IntegerType)
}
}
test("cast from string to boolean adds trim wrapper before native cast when
enabled") {
- withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "true") {
+ withSQLConf("spark.auron.cast.trimString" -> "true") {
assertTrimmedCast(" true ", BooleanType)
}
}
test("cast trim disabled via auron conf") {
- withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "false") {
+ withSQLConf("spark.auron.cast.trimString" -> "false") {
assertNonTrimmedCast(" 42 ", IntegerType)
}
}
test("cast trim disabled via auron conf for boolean cast") {
- withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "false") {
+ withSQLConf("spark.auron.cast.trimString" -> "false") {
assertNonTrimmedCast(" true ", BooleanType)
}
}
diff --git
a/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
b/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
index c88f8457..005e9753 100644
--- a/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
+++ b/spark-extension/src/main/java/org/apache/auron/jni/SparkAuronAdaptor.java
@@ -28,7 +28,6 @@ import org.apache.auron.memory.OnHeapSpillManager;
import org.apache.auron.spark.configuration.SparkAuronConfiguration;
import org.apache.auron.spark.sql.SparkAuronUDFWrapperContext;
import org.apache.spark.SparkEnv;
-import org.apache.spark.SparkEnv$;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.sql.auron.NativeHelper$;
@@ -88,7 +87,7 @@ public class SparkAuronAdaptor extends AuronAdaptor {
@Override
public AuronConfiguration getAuronConfiguration() {
- return new SparkAuronConfiguration(SparkEnv$.MODULE$.get().conf());
+ return new SparkAuronConfiguration();
}
@Override
diff --git
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
index 8e66efb1..8846700f 100644
---
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
+++
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java
@@ -16,15 +16,16 @@
*/
package org.apache.auron.spark.configuration;
-import static org.apache.auron.util.Preconditions.checkNotNull;
-
+import java.util.List;
import java.util.Optional;
+import java.util.function.Supplier;
import org.apache.auron.configuration.AuronConfiguration;
import org.apache.auron.configuration.ConfigOption;
-import org.apache.auron.configuration.ConfigOptions;
-import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.SparkEnv;
import org.apache.spark.internal.config.ConfigEntry;
-import org.apache.spark.internal.config.ConfigEntryWithDefault;
+import org.apache.spark.internal.config.ConfigEntryWithDefaultFunction;
+import org.apache.spark.sql.internal.SQLConf;
import scala.Option;
import scala.collection.immutable.List$;
@@ -38,283 +39,592 @@ public class SparkAuronConfiguration extends
AuronConfiguration {
// please manually add the prefix.
public static final String SPARK_PREFIX = "spark.";
- public static final ConfigOption<Boolean> UI_ENABLED =
ConfigOptions.key("auron.ui.enabled")
- .description("support spark.auron.ui.enabled.")
- .booleanType()
- .defaultValue(true);
-
- public static final ConfigOption<Double> PROCESS_MEMORY_FRACTION =
ConfigOptions.key(
- "auron.process.vmrss.memoryFraction")
- .description("suggested fraction of process total memory (on-heap
and off-heap). "
- + "this limit is for process's resident memory usage.")
- .doubleType()
- .defaultValue(0.9);
-
- public static final ConfigOption<Boolean> CASE_CONVERT_FUNCTIONS_ENABLE =
ConfigOptions.key(
- "auron.enable.caseconvert.functions")
- .description("enable converting upper/lower functions to native,
special cases may provide different, "
- + "outputs from spark due to different unicode versions. ")
- .booleanType()
- .defaultValue(true);
-
- public static final ConfigOption<Boolean> INPUT_BATCH_STATISTICS_ENABLE =
ConfigOptions.key(
- "auron.enableInputBatchStatistics")
- .description("enable extra metrics of input batch statistics. ")
- .booleanType()
- .defaultValue(true);
-
- public static final ConfigOption<Boolean> UDAF_FALLBACK_ENABLE =
ConfigOptions.key("auron.udafFallback.enable")
- .description("supports UDAF and other aggregate functions not
implemented. ")
- .booleanType()
- .defaultValue(true);
-
- public static final ConfigOption<Integer> SUGGESTED_UDAF_ROW_MEM_USAGE =
ConfigOptions.key(
- "auron.suggested.udaf.memUsedSize")
- .description("TypedImperativeAggregate one row mem use size. ")
- .intType()
- .defaultValue(64);
-
- public static final ConfigOption<Integer>
UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG = ConfigOptions.key(
- "auron.udafFallback.num.udafs.trigger.sortAgg")
- .description(
- "number of udafs to trigger sort-based aggregation, by
default, all aggs containing udafs are converted to sort-based.")
- .intType()
- .defaultValue(1);
-
- public static final ConfigOption<Integer> UDAF_FALLBACK_ESTIM_ROW_SIZE =
ConfigOptions.key(
- "auron.udafFallback.typedImperativeEstimatedRowSize")
- .description("TypedImperativeAggregate one row mem use size.")
- .intType()
- .defaultValue(256);
-
- public static final ConfigOption<Boolean> CAST_STRING_TRIM_ENABLE =
ConfigOptions.key("auron.cast.trimString")
- .description("enable trimming string inputs before casting to
numeric/boolean types. ")
- .booleanType()
- .defaultValue(true);
-
- public static final ConfigOption<Boolean> IGNORE_CORRUPTED_FILES =
ConfigOptions.key("files.ignoreCorruptFiles")
- .description("ignore corrupted input files. ")
- .booleanType()
- .defaultValue(false);
-
- public static final ConfigOption<Boolean> PARTIAL_AGG_SKIPPING_ENABLE =
ConfigOptions.key(
- "auron.partialAggSkipping.enable")
- .description("enable partial aggregate skipping (see
https://github.com/apache/auron/issues/327). ")
- .booleanType()
- .defaultValue(true);
-
- public static final ConfigOption<Double> PARTIAL_AGG_SKIPPING_RATIO =
ConfigOptions.key(
- "auron.partialAggSkipping.ratio")
- .description("partial aggregate skipping ratio. ")
- .doubleType()
- .defaultValue(0.9);
-
- public static final ConfigOption<Integer> PARTIAL_AGG_SKIPPING_MIN_ROWS =
ConfigOptions.key(
- "auron.partialAggSkipping.minRows")
- .description("minimum number of rows to trigger partial aggregate
skipping.")
- .intType()
- .dynamicDefaultValue(
+ public static final ConfigOption<Boolean> AURON_ENABLED = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enabled")
+ .addAltKey("auron.enable")
+ .withCategory("Runtime Configuration")
+ .withDescription("Enable Spark Auron support to accelerate query
execution with native implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> UI_ENABLED = new
ConfigOption<>(Boolean.class)
+ .withKey("auron.ui.enabled")
+ .addAltKey("auron.ui.enable")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Enable Spark Auron UI support to display Auron-specific
metrics and statistics in Spark UI.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Double> PROCESS_MEMORY_FRACTION = new
ConfigOption<>(Double.class)
+ .withKey("auron.process.vmrss.memoryFraction")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Suggested fraction of process total memory (on-heap and
off-heap) to use for resident memory. "
+ + "This controls the memory limit for the
process's virtual memory resident set size (VMRSS).")
+ .withDefaultValue(0.9);
+
+ public static final ConfigOption<Boolean> CASE_CONVERT_FUNCTIONS_ENABLE =
new ConfigOption<>(Boolean.class)
+ .withKey("auron.enable.caseconvert.functions")
+ .withCategory("Expression/Function Supports")
+ .withDescription(
+ "Enable converting UPPER/LOWER string functions to native
implementations for better performance. "
+ + "Note: May produce different outputs from Spark
in special cases due to different Unicode versions.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> INPUT_BATCH_STATISTICS_ENABLE =
new ConfigOption<>(Boolean.class)
+ .withKey("auron.enableInputBatchStatistics")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Enable collection of additional metrics for input batch
statistics to monitor data processing performance.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> UDAF_FALLBACK_ENABLE = new
ConfigOption<>(Boolean.class)
+ .withKey("auron.udafFallback.enable")
+ .withCategory("UDAF Fallback")
+ .withDescription(
+ "Enable fallback support for UDAF and other aggregate
functions that are not implemented in Auron, "
+ + "allowing them to be executed using Spark's
native implementation.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Integer> SUGGESTED_UDAF_ROW_MEM_USAGE =
new ConfigOption<>(Integer.class)
+ .withKey("auron.suggested.udaf.memUsedSize")
+ .withCategory("UDAF Fallback")
+ .withDescription("Suggested memory usage size per row for
TypedImperativeAggregate functions in bytes. "
+ + "This helps in memory allocation planning for UDAF
operations.")
+ .withDefaultValue(64);
+
+ public static final ConfigOption<Integer>
UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG = new ConfigOption<>(
+ Integer.class)
+ .withKey("auron.udafFallback.num.udafs.trigger.sortAgg")
+ .withCategory("UDAF Fallback")
+ .withDescription(
+ "Number of UDAFs to trigger sort-based aggregation, by
default, all aggs containing udafs are converted to sort-based.")
+ .withDefaultValue(1);
+
+ public static final ConfigOption<Integer> UDAF_FALLBACK_ESTIM_ROW_SIZE =
new ConfigOption<>(Integer.class)
+ .withKey("auron.udafFallback.typedImperativeEstimatedRowSize")
+ .withCategory("UDAF Fallback")
+ .withDescription("Estimated memory size per row for
TypedImperativeAggregate functions in bytes. "
+ + "This estimation is used for memory planning and
allocation during UDAF fallback operations.")
+ .withDefaultValue(256);
+
+ public static final ConfigOption<Boolean> CAST_STRING_TRIM_ENABLE = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.cast.trimString")
+ .withCategory("Expression/Function Supports")
+ .withDescription(
+ "Enable automatic trimming of whitespace from string
inputs before casting to numeric or boolean types. "
+ + "This helps prevent casting errors due to
leading/trailing whitespace.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> IGNORE_CORRUPTED_FILES = new
ConfigOption<>(Boolean.class)
+ .withKey("auron.files.ignoreCorruptFiles")
+ .withCategory("Data Sources")
+ .withDescription("Ignore corrupted input files, defaults to
spark.sql.files.ignoreCorruptFiles")
+ .withDynamicDefaultValue(
+ conf ->
SparkEnv.get().conf().getBoolean("spark.sql.files.ignoreCorruptFiles", false));
+
+ public static final ConfigOption<Boolean> PARTIAL_AGG_SKIPPING_ENABLE =
new ConfigOption<>(Boolean.class)
+ .withKey("auron.partialAggSkipping.enable")
+ .withCategory("Partial Aggregate Skipping")
+ .withDescription(
+ "Enable partial aggregate skipping optimization to improve
performance by skipping unnecessary "
+ + "partial aggregation stages when certain
conditions are met. See issue #327 for detailed implementation.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Double> PARTIAL_AGG_SKIPPING_RATIO = new
ConfigOption<>(Double.class)
+ .withKey("auron.partialAggSkipping.ratio")
+ .withCategory("Partial Aggregate Skipping")
+ .withDescription(
+ "Threshold ratio for partial aggregate skipping
optimization. When the ratio of unique keys to total rows "
+ + "exceeds this value, partial aggregation may be
skipped to improve performance.")
+ .withDefaultValue(0.9);
+
+ public static final ConfigOption<Integer> PARTIAL_AGG_SKIPPING_MIN_ROWS =
new ConfigOption<>(Integer.class)
+ .withKey("auron.partialAggSkipping.minRows")
+ .withCategory("Partial Aggregate Skipping")
+ .withDescription("Minimum number of rows required to trigger
partial aggregate skipping optimization. "
+ + "This prevents the optimization from being applied to
very small datasets where it may not be beneficial. "
+ + "Defaults to spark.auron.batchSize * 5")
+ .withDynamicDefaultValue(
config ->
config.getOptional(AuronConfiguration.BATCH_SIZE).get() * 5);
- public static final ConfigOption<Boolean> PARTIAL_AGG_SKIPPING_SKIP_SPILL
= ConfigOptions.key(
- "auron.partialAggSkipping.skipSpill")
- .description("always skip partial aggregate when triggered
spilling. ")
- .booleanType()
- .defaultValue(false);
-
- public static final ConfigOption<Boolean> PARQUET_ENABLE_PAGE_FILTERING =
ConfigOptions.key(
- "auron.parquet.enable.pageFiltering")
- .description("parquet enable page filtering. ")
- .booleanType()
- .defaultValue(false);
-
- public static final ConfigOption<Boolean> PARQUET_ENABLE_BLOOM_FILTER =
ConfigOptions.key(
- "auron.parquet.enable.bloomFilter")
- .description("parquet enable bloom filter. ")
- .booleanType()
- .defaultValue(false);
-
- public static final ConfigOption<Integer> PARQUET_MAX_OVER_READ_SIZE =
ConfigOptions.key(
- "auron.parquet.maxOverReadSize")
- .description("parquet max over read size.")
- .intType()
- .defaultValue(16384);
-
- public static final ConfigOption<Integer> PARQUET_METADATA_CACHE_SIZE =
ConfigOptions.key(
- "auron.parquet.metadataCacheSize")
- .description("parquet metadata cache size.")
- .intType()
- .defaultValue(5);
-
- public static final ConfigOption<String> SPARK_IO_COMPRESSION_CODEC =
ConfigOptions.key("io.compression.codec")
- .description("spark io compression codec.")
- .stringType()
- .defaultValue("lz4");
-
- public static final ConfigOption<Integer> SPARK_IO_COMPRESSION_ZSTD_LEVEL
= ConfigOptions.key(
- "io.compression.zstd.level")
- .description("spark io compression zstd level.")
- .intType()
- .defaultValue(1);
-
- public static final ConfigOption<Integer> TOKIO_WORKER_THREADS_PER_CPU =
ConfigOptions.key(
- "auron.tokio.worker.threads.per.cpu")
- .description("tokio worker threads per cpu (spark.task.cpus), 0
for auto detection.")
- .intType()
- .defaultValue(0);
-
- public static final ConfigOption<Integer> SPARK_TASK_CPUS =
ConfigOptions.key("task.cpus")
- .description("number of cpus per task.")
- .intType()
- .defaultValue(1);
-
- public static final ConfigOption<Boolean> FORCE_SHUFFLED_HASH_JOIN =
ConfigOptions.key(
- "auron.forceShuffledHashJoin")
- .description("replace all sort-merge join to shuffled-hash join,
only used for benchmarking. ")
- .booleanType()
- .defaultValue(false);
-
- public static final ConfigOption<Integer>
SHUFFLE_COMPRESSION_TARGET_BUF_SIZE = ConfigOptions.key(
- "auron.shuffle.compression.targetBufSize")
- .description("shuffle compression target buffer size, default is
4MB.")
- .intType()
- .defaultValue(4194304);
-
- public static final ConfigOption<String> SPILL_COMPRESSION_CODEC =
ConfigOptions.key(
- "auron.spill.compression.codec")
- .description("spark spill compression codec.")
- .stringType()
- .defaultValue("lz4");
-
- public static final ConfigOption<Boolean> SMJ_FALLBACK_ENABLE =
ConfigOptions.key("auron.smjfallback.enable")
- .description("enable hash join falling back to sort merge join
when hash table is too big. ")
- .booleanType()
- .defaultValue(false);
-
- public static final ConfigOption<Integer> SMJ_FALLBACK_ROWS_THRESHOLD =
ConfigOptions.key(
- "auron.smjfallback.rows.threshold")
- .description("smj fallback threshold.")
- .intType()
- .defaultValue(10000000);
-
- public static final ConfigOption<Integer> SMJ_FALLBACK_MEM_SIZE_THRESHOLD
= ConfigOptions.key(
- "auron.smjfallback.mem.threshold")
- .description("smj fallback mem threshold.")
- .intType()
- .defaultValue(134217728);
-
- public static final ConfigOption<Double> ON_HEAP_SPILL_MEM_FRACTION =
ConfigOptions.key(
- "auron.onHeapSpill.memoryFraction")
- .description("max memory fraction of on-heap spills. ")
- .doubleType()
- .defaultValue(0.9);
-
- public static final ConfigOption<Integer> SUGGESTED_BATCH_MEM_SIZE =
ConfigOptions.key(
- "auron.suggested.batch.memSize")
- .description("suggested memory size for record batch.")
- .intType()
- .defaultValue(8388608);
-
- public static final ConfigOption<Boolean> PARSE_JSON_ERROR_FALLBACK =
ConfigOptions.key(
- "auron.parseJsonError.fallback")
- .description("fallback to UDFJson when error parsing json in
native implementation. ")
- .booleanType()
- .defaultValue(true);
-
- public static final ConfigOption<Integer>
SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE = ConfigOptions.key(
- "auron.suggested.batch.memSize.multiwayMerging")
- .description("suggested memory size for k-way merging use smaller
batch memory size for "
- + "k-way merging since there will be multiple batches in
memory at the same time.")
- .intType()
- .defaultValue(1048576);
- public static final ConfigOption<Boolean> ORC_FORCE_POSITIONAL_EVOLUTION =
ConfigOptions.key(
- "auron.orc.force.positional.evolution")
- .description("orc force positional evolution. ")
- .booleanType()
- .defaultValue(false);
- public static final ConfigOption<Boolean> ORC_TIMESTAMP_USE_MICROSECOND =
ConfigOptions.key(
- "auron.orc.timestamp.use.microsecond")
- .description("use microsecond precision when reading ORC timestamp
columns. ")
- .booleanType()
- .defaultValue(false);
- public static final ConfigOption<Boolean> ORC_SCHEMA_CASE_SENSITIVE =
ConfigOptions.key(
- "auron.orc.schema.caseSensitive.enable")
- .description("whether ORC file schema matching distinguishes
between uppercase and lowercase. ")
- .booleanType()
- .defaultValue(false);
-
- public static final ConfigOption<Boolean> FORCE_SHORT_CIRCUIT_AND_OR =
ConfigOptions.key(
- "auron.forceShortCircuitAndOr")
- .description("force using short-circuit evaluation
(PhysicalSCAndExprNode/PhysicalSCOrExprNode) "
- + "for And/Or expressions, regardless of whether rhs
contains HiveUDF. ")
- .booleanType()
- .defaultValue(false);
-
- private final SparkConf sparkConf;
-
- public SparkAuronConfiguration(SparkConf conf) {
- this.sparkConf = checkNotNull(conf, "spark conf cannot be null");
- }
+ public static final ConfigOption<Boolean> PARTIAL_AGG_SKIPPING_SKIP_SPILL
= new ConfigOption<>(Boolean.class)
+ .withKey("auron.partialAggSkipping.skipSpill")
+ .withCategory("Partial Aggregate Skipping")
+ .withDescription("Always skip partial aggregation when spilling is
triggered to prevent memory pressure. "
+ + "When enabled, the system will bypass partial
aggregation stages if memory spilling occurs, "
+ + "potentially trading off some optimization for memory
stability.")
+ .withDefaultValue(false);
+
+ public static final ConfigOption<Boolean> PARQUET_ENABLE_PAGE_FILTERING =
new ConfigOption<>(Boolean.class)
+ .withKey("auron.parquet.enable.pageFiltering")
+ .withCategory("Data Sources")
+ .withDescription(
+ "Enable Parquet page-level filtering to skip reading
unnecessary data pages during query execution. "
+ + "This optimization can significantly improve
read performance by avoiding I/O for pages that don't match filter predicates.")
+ .withDefaultValue(false);
+
+ public static final ConfigOption<Boolean> PARQUET_ENABLE_BLOOM_FILTER =
new ConfigOption<>(Boolean.class)
+ .withKey("auron.parquet.enable.bloomFilter")
+ .withCategory("Data Sources")
+ .withDescription(
+ "Enable Parquet bloom filter support for efficient
equality predicate filtering. "
+ + "Bloom filters can quickly determine if a value
might exist in a data block, reducing unnecessary I/O operations.")
+ .withDefaultValue(false);
+
+ public static final ConfigOption<Integer> PARQUET_MAX_OVER_READ_SIZE = new
ConfigOption<>(Integer.class)
+ .withKey("auron.parquet.maxOverReadSize")
+ .withCategory("Data Sources")
+ .withDescription(
+ "Maximum over-read size in bytes for Parquet file
operations. This controls how much extra data "
+ + "can be read beyond the required data to
optimize I/O operations and improve read performance.")
+ .withDefaultValue(16384);
+
+ public static final ConfigOption<Integer> PARQUET_METADATA_CACHE_SIZE =
new ConfigOption<>(Integer.class)
+ .withKey("auron.parquet.metadataCacheSize")
+ .withCategory("Data Sources")
+ .withDescription("Size of the Parquet metadata cache in number of
entries. This cache stores file metadata "
+ + "to avoid repeated metadata reads and improve query
performance for frequently accessed files.")
+ .withDefaultValue(5);
+
+ public static final ConfigOption<String> SPARK_IO_COMPRESSION_CODEC = new
ConfigOption<>(String.class)
+ .withKey("io.compression.codec")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Compression codec used for Spark I/O operations. Common
options include lz4, snappy, gzip, and zstd. "
+ + "The choice of codec affects both compression
ratio and decompression speed.")
+ .withDynamicDefaultValue(_conf ->
SparkEnv.get().conf().get("spark.io.compression.codec", "lz4"));
+
+ public static final ConfigOption<Integer> SPARK_IO_COMPRESSION_ZSTD_LEVEL
= new ConfigOption<>(Integer.class)
+ .withKey("io.compression.zstd.level")
+ .withCategory("Runtime Configuration")
+ .withDescription("Compression level for Zstandard (zstd)
compression codec used in Spark I/O operations. "
+ + "Valid values range from 1 (fastest) to 22 (highest
compression). Higher levels provide better compression "
+ + "but require more CPU time and memory.")
+ .withDynamicDefaultValue(_conf ->
SparkEnv.get().conf().getInt("spark.io.compression.zstd.level", 1));
+
+ public static final ConfigOption<Integer> TOKIO_WORKER_THREADS_PER_CPU =
new ConfigOption<>(Integer.class)
+ .withKey("auron.tokio.worker.threads.per.cpu")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Number of Tokio worker threads to create per CPU core
(spark.task.cpus). Set to 0 for automatic detection "
+ + "based on available CPU cores. This setting
controls the thread pool size for Tokio-based asynchronous operations.")
+ .withDefaultValue(0);
+
+ public static final ConfigOption<Integer> SPARK_TASK_CPUS = new
ConfigOption<>(Integer.class)
+ .withKey("task.cpus")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Number of CPU cores allocated per Spark task. This
setting determines the parallelism level "
+ + "for individual tasks and affects resource
allocation and task scheduling. "
+ + "Defaults to spark.task.cpus.")
+ .withDynamicDefaultValue(_conf ->
SparkEnv.get().conf().getInt("spark.task.cpus", 1));
+
+ public static final ConfigOption<Boolean> FORCE_SHUFFLED_HASH_JOIN = new
ConfigOption<>(Boolean.class)
+ .withKey("auron.forceShuffledHashJoin")
+ .withCategory("Operator Supports")
+ .withDescription(
+ "Force replacement of all sort-merge joins with
shuffled-hash joins for performance comparison and benchmarking. "
+ + "This setting is primarily used for testing and
performance analysis, as different join strategies may be optimal "
+ + "for different data distributions and query
patterns.")
+ .withDefaultValue(false);
+
+ public static final ConfigOption<Integer>
SHUFFLE_COMPRESSION_TARGET_BUF_SIZE = new ConfigOption<>(Integer.class)
+ .withKey("auron.shuffle.compression.targetBufSize")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Target buffer size in bytes for shuffle compression
operations. This setting controls the buffer size "
+ + "used during shuffle data compression, affecting
both compression efficiency and memory usage. Default is 4MB (4,194,304
bytes).")
+ .withDefaultValue(4194304);
+
+ public static final ConfigOption<String> SPILL_COMPRESSION_CODEC = new
ConfigOption<>(String.class)
+ .withKey("auron.spill.compression.codec")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Compression codec used for Spark spill operations when
data is written to disk due to memory pressure. "
+ + "Common options include lz4, snappy, and gzip.
The choice affects both spill performance and disk space usage.")
+ .withDefaultValue("lz4");
+
+ public static final ConfigOption<Boolean> SMJ_FALLBACK_ENABLE = new
ConfigOption<>(Boolean.class)
+ .withKey("auron.smjfallback.enable")
+ .withCategory("Operator Supports")
+ .withDescription(
+ "Enable fallback from hash join to sort-merge join when
the hash table becomes too large to fit in memory. "
+ + "This prevents out-of-memory errors by switching
to a more memory-efficient join strategy when necessary.")
+ .withDefaultValue(false);
+
+ public static final ConfigOption<Integer> SMJ_FALLBACK_ROWS_THRESHOLD =
new ConfigOption<>(Integer.class)
+ .withKey("auron.smjfallback.rows.threshold")
+ .withCategory("Operator Supports")
+ .withDescription(
+ "Row count threshold that triggers fallback from hash join
to sort-merge join. When the number of rows "
+ + "in the hash table exceeds this threshold, the
system will switch to sort-merge join to avoid memory issues.")
+ .withDefaultValue(10000000);
+
+ public static final ConfigOption<Integer> SMJ_FALLBACK_MEM_SIZE_THRESHOLD
= new ConfigOption<>(Integer.class)
+ .withKey("auron.smjfallback.mem.threshold")
+ .withCategory("Operator Supports")
+ .withDescription("Memory size threshold in bytes that triggers
fallback from hash join to sort-merge join. "
+ + "When the hash table memory usage exceeds this threshold
(128MB by default), the system switches "
+ + "to sort-merge join to prevent memory overflow.")
+ .withDefaultValue(134217728);
+
+ public static final ConfigOption<Double> ON_HEAP_SPILL_MEM_FRACTION = new
ConfigOption<>(Double.class)
+ .withKey("auron.onHeapSpill.memoryFraction")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Maximum memory fraction allocated for on-heap spilling
operations. This controls what portion "
+ + "of the available on-heap memory can be used for
spilling data to disk when memory pressure occurs.")
+ .withDefaultValue(0.9);
+
+ public static final ConfigOption<Integer> SUGGESTED_BATCH_MEM_SIZE = new
ConfigOption<>(Integer.class)
+ .withKey("auron.suggested.batch.memSize")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Suggested memory size in bytes for record batches. This
setting controls the target memory allocation "
+ + "for individual data batches to optimize memory
usage and processing efficiency. Default is 8MB (8,388,608 bytes).")
+ .withDefaultValue(8388608);
+
+ public static final ConfigOption<Boolean> PARSE_JSON_ERROR_FALLBACK = new
ConfigOption<>(Boolean.class)
+ .withKey("auron.parseJsonError.fallback")
+ .withCategory("Expression/Function Supports")
+ .withDescription(
+ "Enable fallback to UDFJson implementation when native
JSON parsing encounters errors. "
+ + "This ensures query execution continues even
when the native JSON parser fails, at the cost of potentially slower
performance.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Integer>
SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE = new ConfigOption<>(Integer.class)
+ .withKey("auron.suggested.batch.memSize.multiwayMerging")
+ .withCategory("Runtime Configuration")
+ .withDescription(
+ "Suggested memory size in bytes for k-way merging
operations. This uses a smaller batch memory size "
+ + "compared to regular operations since multiple
batches are kept in memory simultaneously during k-way merging. "
+ + "Default is 1MB (1,048,576 bytes).")
+ .withDefaultValue(1048576);
+
+ public static final ConfigOption<Boolean> ORC_FORCE_POSITIONAL_EVOLUTION =
new ConfigOption<>(Boolean.class)
+ .withKey("auron.orc.force.positional.evolution")
+ .withCategory("Data Sources")
+ .withDescription(
+ "Force ORC positional evolution mode for schema evolution
operations. When enabled, column mapping "
+ + "will be based on column position rather than
column name, which can be useful for certain schema evolution scenarios.")
+ .withDefaultValue(false);
+
+ public static final ConfigOption<Boolean> ORC_TIMESTAMP_USE_MICROSECOND =
new ConfigOption<>(Boolean.class)
+ .withKey("auron.orc.timestamp.use.microsecond")
+ .withCategory("Data Sources")
+ .withDescription(
+ "Use microsecond precision when reading ORC timestamp
columns instead of the default millisecond precision. "
+ + "This provides higher temporal resolution for
timestamp data but may require more storage space.")
+ .withDefaultValue(false);
+
+ public static final ConfigOption<Boolean> ORC_SCHEMA_CASE_SENSITIVE = new
ConfigOption<>(Boolean.class)
+ .withKey("auron.orc.schema.caseSensitive.enable")
+ .withCategory("Data Sources")
+ .withDescription(
+ "Enable case-sensitive schema matching for ORC files. When
true, column names in the schema must match "
+ + "the case of columns in the ORC file exactly.
When false, column name matching is case-insensitive.")
+ .withDefaultValue(false);
+
+ public static final ConfigOption<Boolean> FORCE_SHORT_CIRCUIT_AND_OR = new
ConfigOption<>(Boolean.class)
+ .withKey("auron.forceShortCircuitAndOr")
+ .withCategory("Expression/Function Supports")
+ .withDescription(
+ "Force the use of short-circuit evaluation
(PhysicalSCAndExprNode/PhysicalSCOrExprNode) for AND/OR expressions, "
+ + "regardless of whether the right-hand side
contains Hive UDFs. This can improve performance by avoiding unnecessary "
+ + "evaluation of expressions when the result is
already determined.")
+ .withDefaultValue(false);
+
+ public static final ConfigOption<Boolean> ENABLE_SCAN = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.scan")
+ .withCategory("Operator Supports")
+ .withDescription("Enable ScanExec operation conversion to native
Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_PAIMON_SCAN = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.paimon.scan")
+ .withCategory("Operator Supports")
+ .withDescription("Enable PaimonScanExec operation conversion to
native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_PROJECT = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.project")
+ .withCategory("Operator Supports")
+ .withDescription("Enable ProjectExec operation conversion to
native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_FILTER = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.filter")
+ .withCategory("Operator Supports")
+ .withDescription("Enable FilterExec operation conversion to native
Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_SORT = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.sort")
+ .withCategory("Operator Supports")
+ .withDescription("Enable SortExec operation conversion to native
Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_UNION = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.union")
+ .withCategory("Operator Supports")
+ .withDescription("Enable UnionExec operation conversion to native
Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_SMJ = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.smj")
+ .withCategory("Operator Supports")
+ .withDescription("Enable SortMergeJoinExec operation conversion to
native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_SHJ = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.shj")
+ .withCategory("Operator Supports")
+ .withDescription("Enable ShuffledHashJoinExec operation conversion
to native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_BHJ = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.bhj")
+ .withCategory("Operator Supports")
+ .withDescription("Enable BroadcastHashJoinExec operation
conversion to native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_BNLJ = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.bnlj")
+ .withCategory("Operator Supports")
+ .withDescription("Enable BroadcastNestedLoopJoinExec operation
conversion to native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_LOCAL_LIMIT = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.local.limit")
+ .withCategory("Operator Supports")
+ .withDescription("Enable LocalLimitExec operation conversion to
native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_GLOBAL_LIMIT = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.global.limit")
+ .withCategory("Operator Supports")
+ .withDescription("Enable GlobalLimitExec operation conversion to
native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_TAKE_ORDERED_AND_PROJECT
= new SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.take.ordered.and.project")
+ .withCategory("Operator Supports")
+ .withDescription("Enable TakeOrderedAndProjectExec operation
conversion to native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_COLLECT_LIMIT = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.collectLimit")
+ .withCategory("Operator Supports")
+ .withDescription("Enable CollectLimitExec operation conversion to
native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_AGGR = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.aggr")
+ .withCategory("Operator Supports")
+ .withDescription("Enable AggregateExec operation conversion to
native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_EXPAND = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.expand")
+ .withCategory("Operator Supports")
+ .withDescription("Enable ExpandExec operation conversion to native
Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_WINDOW = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.window")
+ .withCategory("Operator Supports")
+ .withDescription("Enable WindowExec operation conversion to native
Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_WINDOW_GROUP_LIMIT = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.window.group.limit")
+ .withCategory("Operator Supports")
+ .withDescription("Enable WindowGroupLimitExec operation conversion
to native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_GENERATE = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.generate")
+ .withCategory("Operator Supports")
+ .withDescription("Enable GenerateExec operation conversion to
native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_LOCAL_TABLE_SCAN = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.local.table.scan")
+ .withCategory("Operator Supports")
+ .withDescription("Enable LocalTableScanExec operation conversion
to native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_DATA_WRITING = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.data.writing")
+ .withCategory("Operator Supports")
+ .withDescription("Enable DataWritingExec operation conversion to
native Auron implementations.")
+ .withDefaultValue(false);
+
+ public static final ConfigOption<Boolean> ENABLE_SCAN_PARQUET = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.scan.parquet")
+ .withCategory("Data Sources")
+ .withDescription("Enable ParquetScanExec operation conversion to
native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_SCAN_PARQUET_TIMESTAMP =
new SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.scan.parquet.timestamp")
+ .withCategory("Data Sources")
+ .withDescription(
+ "Enable ParquetScanExec operation conversion with
timestamp fields to native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_SCAN_ORC = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.scan.orc")
+ .withCategory("Data Sources")
+ .withDescription("Enable OrcScanExec operation conversion to
native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_SCAN_ORC_TIMESTAMP = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.scan.orc.timestamp")
+ .withCategory("Data Sources")
+ .withDescription(
+ "Enable OrcScanExec operation conversion with timestamp
fields to native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_BROADCAST_EXCHANGE = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.broadcastExchange")
+ .withCategory("Operator Supports")
+ .withDescription("Enable BroadcastExchangeExec operation
conversion to native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> ENABLE_SHUFFLE_EXCHANGE = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.enable.shuffleExchange")
+ .withCategory("Operator Supports")
+ .withDescription("Enable ShuffleExchangeExec operation conversion
to native Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> UDF_JSON_ENABLED = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.udf.UDFJson.enabled")
+ .withCategory("Expression/Function Supports")
+ .withDescription("Enable UDFJson function conversion to native
Auron implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> UDF_BRICKHOUSE_ENABLED = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.udf.brickhouse.enabled")
+ .withCategory("Expression/Function Supports")
+ .withDescription("Enable Brickhouse UDF conversion to native Auron
implementations.")
+ .withDefaultValue(true);
+
+ public static final ConfigOption<Boolean> DECIMAL_ARITH_OP_ENABLED = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.decimal.arithOp.enabled")
+ .withCategory("Expression/Function Supports")
+ .withDescription("Enable decimal arithmetic operations conversion
to native Auron implementations.")
+ .withDefaultValue(false);
+
+ public static final ConfigOption<Boolean> DATETIME_EXTRACT_ENABLED = new
SQLConfOption<>(Boolean.class)
+ .withKey("auron.datetime.extract.enabled")
+ .withCategory("Expression/Function Supports")
+ .withDescription("Enable datetime extract operations conversion to
native Auron implementations.")
+ .withDefaultValue(false);
+
+ public static final ConfigOption<Boolean>
UDF_SINGLE_CHILD_FALLBACK_ENABLED = new SQLConfOption<>(Boolean.class)
+ .withKey("auron.udf.singleChildFallback.enabled")
+ .withCategory("Expression/Function Supports")
+ .withDescription("Enable falling-back UDF/expression with single
child.")
+ .withDefaultValue(true);
@Override
public <T> Optional<T> getOptional(ConfigOption<T> option) {
- if (option.key().startsWith(SPARK_PREFIX)) {
- return Optional.ofNullable(getSparkConf(option.key(),
getOptionDefaultValue(option)));
- } else {
- return Optional.ofNullable(getSparkConf(SPARK_PREFIX +
option.key(), getOptionDefaultValue(option)));
- }
+ GetFromSparkType getFromSparkType = option instanceof SQLConfOption
+ ? GetFromSparkType.FROM_SQL_CONF
+ : option instanceof SparkContextOption
+ ? GetFromSparkType.FROM_SPARK_CONTEXT
+ : GetFromSparkType.FROM_SPARK_ENV;
+ return Optional.ofNullable(getFromSpark(
+ option.key(),
+ option.altKeys(),
+ option.getValueClass(),
+ () -> getOptionDefaultValue(option),
+ getFromSparkType));
}
- @Override
- public <T> Optional<T> getOptional(String key) {
- if (key.startsWith(SPARK_PREFIX)) {
- return Optional.ofNullable(getSparkConf(key, null));
- } else {
- return Optional.ofNullable(getSparkConf(SPARK_PREFIX + key, null));
- }
+ enum GetFromSparkType {
+ FROM_SPARK_ENV,
+ FROM_SPARK_CONTEXT,
+ FROM_SQL_CONF;
}
- private synchronized <T> T getSparkConf(String key, T defaultValue) {
- // Use synchronized to avoid issues with multiple threads.
- synchronized (ConfigEntry.class) {
- ConfigEntry<T> entry = (ConfigEntry<T>) ConfigEntry.findEntry(key);
- if (entry == null) {
- entry = new ConfigEntryWithDefault<>(
- key,
- Option.<String>empty(),
+ @SuppressWarnings("unchecked")
+ private <T> T getFromSpark(
+ String key,
+ List<String> altKeys,
+ Class<T> valueClass,
+ Supplier<T> defaultValueSupplier,
+ GetFromSparkType getFromSparkType) {
+ Object configEntry;
+
+ synchronized (SparkAuronConfiguration.class) {
+ String sparkConfKey = key.startsWith(SPARK_PREFIX) ? key :
SPARK_PREFIX + key;
+ configEntry = ConfigEntry.findEntry(sparkConfKey);
+ for (String altKey : altKeys) {
+ String sparkConfAltKey = altKey.startsWith(SPARK_PREFIX) ?
altKey : SPARK_PREFIX + altKey;
+ if (configEntry != null) {
+ break;
+ }
+ configEntry = ConfigEntry.findEntry(sparkConfAltKey);
+ }
+
+ if (configEntry == null) {
+ configEntry = new ConfigEntryWithDefaultFunction<>(
+ sparkConfKey,
+ Option.empty(),
"",
List$.MODULE$.empty(),
- defaultValue,
- (val) -> valueConverter(val, defaultValue,
defaultValue == null),
+ defaultValueSupplier::get,
+ val -> valueConverter(val, valueClass),
String::valueOf,
null,
true,
null);
}
- return sparkConf.get(entry);
+ }
+
+ if (getFromSparkType == GetFromSparkType.FROM_SPARK_ENV) {
+ return SparkEnv.get().conf().get((ConfigEntry<T>) configEntry);
+
+ } else if (getFromSparkType == GetFromSparkType.FROM_SPARK_CONTEXT) {
+ return SparkContext.getOrCreate().getConf().get((ConfigEntry<T>)
configEntry);
+
+ } else if (getFromSparkType == GetFromSparkType.FROM_SQL_CONF) {
+ return ((ConfigEntry<T>)
configEntry).readFrom(SQLConf.get().reader());
+
+ } else {
+ throw new IllegalArgumentException("unknown getFromSparkType: " +
getFromSparkType);
}
}
- private <T> T valueConverter(String value, T defaultValue, boolean
defaultValueIsNull) {
- if (defaultValueIsNull) {
+ private <T> T valueConverter(String value, Class<T> valueClass) {
+ if (valueClass == Integer.class) {
+ return (T) Integer.valueOf(value);
+ } else if (valueClass == Long.class) {
+ return (T) Long.valueOf(value);
+ } else if (valueClass == Boolean.class) {
+ return (T) Boolean.valueOf(value);
+ } else if (valueClass == Float.class) {
+ return (T) Float.valueOf(value);
+ } else if (valueClass == Double.class) {
+ return (T) Double.valueOf(value);
+ } else if (valueClass == String.class) {
return (T) value;
} else {
- if (defaultValue instanceof Integer) {
- return (T) Integer.valueOf(value);
- } else if (defaultValue instanceof Long) {
- return (T) Long.valueOf(value);
- } else if (defaultValue instanceof Boolean) {
- return (T) Boolean.valueOf(value);
- } else if (defaultValue instanceof Float) {
- return (T) Float.valueOf(value);
- } else if (defaultValue instanceof Double) {
- return (T) Double.valueOf(value);
- } else if (defaultValue instanceof String) {
- return (T) String.valueOf(value);
- } else {
- throw new IllegalArgumentException("Unsupported default value
type: "
- + defaultValue.getClass().getName());
- }
+ throw new IllegalArgumentException("Unsupported default value
type: " + valueClass.getName());
}
}
}
+
+class SparkContextOption<T> extends ConfigOption<T> {
+ SparkContextOption(Class<T> clazz) {
+ super(clazz);
+ }
+}
+
+class SQLConfOption<T> extends ConfigOption<T> {
+ SQLConfOption(Class<T> clazz) {
+ super(clazz);
+ }
+}
diff --git
a/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfigurationDocGenerator.java
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfigurationDocGenerator.java
new file mode 100644
index 00000000..ea587651
--- /dev/null
+++
b/spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfigurationDocGenerator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.spark.configuration;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.auron.configuration.ConfigOption;
+
+public class SparkAuronConfigurationDocGenerator {
+
+ // Generate documentation for SparkAuronConfiguration
+ public static void main(String[] args) {
+ // Categories array based on SparkAuronConfiguration categories
+ String[] categories = {
+ "Runtime Configuration",
+ "Operator Supports",
+ "Data Sources",
+ "Expression/Function Supports",
+ "UDAF Fallback",
+ "Partial Aggregate Skipping"
+ };
+ Class<SparkAuronConfiguration> auronConfigurationClass =
SparkAuronConfiguration.class;
+ List<ConfigOption<?>> configOptions = new ArrayList<>();
+
+ for (Field field : auronConfigurationClass.getFields()) {
+ try {
+ configOptions.add((ConfigOption<?>) field.get(null));
+ } catch (IllegalAccessException | ClassCastException e) {
+ // this is not a config option
+ }
+ }
+ configOptions.sort(Comparator.comparing(option -> option.category() +
option.key()));
+
+ for (String category : categories) {
+ System.out.println();
+ System.out.println("### " + category);
+ System.out.println("| Conf Key | Type | Default Value |
Description |");
+ System.out.println("| -------- | ---- | ------------- |
----------- |");
+ for (ConfigOption<?> configOption : configOptions) {
+ if (!configOption.category().equals(category)) {
+ continue;
+ }
+ String sparkConfKey =
+ configOption.key().startsWith("spark.") ?
configOption.key() : "spark." + configOption.key();
+ for (String altKey : configOption.altKeys()) {
+ String sparkConfAltKey = altKey.startsWith("spark.") ?
altKey : "spark." + altKey;
+ sparkConfKey += "<br/> _alternative: " +
sparkConfAltKey + "_";
+ }
+ String sparkConfDesc = configOption.description();
+ Class<?> sparkConfValueClass = configOption.getValueClass();
+ Object sparkConfDefaultValue = configOption.defaultValue() ==
null ? "-" : configOption.defaultValue();
+ System.out.println("| " + sparkConfKey + " | " +
sparkConfValueClass.getSimpleName() + " | "
+ + sparkConfDefaultValue + " | " + sparkConfDesc + "
|");
+ }
+ }
+ }
+}
diff --git
a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
b/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
deleted file mode 100644
index 2943d4b2..00000000
--- a/spark-extension/src/main/java/org/apache/spark/sql/auron/AuronConf.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.auron;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkEnv$;
-
-/**
- * This class has been deprecated and migrated to {@link
org.apache.auron.spark.configuration.SparkAuronConfiguration}.
- * Will be removed in the future.
- */
-@Deprecated
-@SuppressWarnings("unused")
-public enum AuronConf {
- // support spark.auron.ui.enabled
- UI_ENABLED("spark.auron.ui.enabled", true),
-
- /// suggested batch size for arrow batches.
- BATCH_SIZE("spark.auron.batchSize", 10000),
-
- /// suggested fraction of off-heap memory used in native execution.
- /// actual off-heap memory usage is expected to be
spark.executor.memoryOverhead * fraction.
- MEMORY_FRACTION("spark.auron.memoryFraction", 0.6),
-
- /// suggested fraction of process total memory (on-heap and off-heap).
- /// this limit is for process's resident memory usage
- PROCESS_MEMORY_FRACTION("spark.auron.process.vmrss.memoryFraction", 0.9),
-
- /// enable converting upper/lower functions to native, special cases may
provide different
- /// outputs from spark due to different unicode versions.
- CASE_CONVERT_FUNCTIONS_ENABLE("spark.auron.enable.caseconvert.functions",
true),
-
- /// enable extra metrics of input batch statistics
- INPUT_BATCH_STATISTICS_ENABLE("spark.auron.enableInputBatchStatistics",
true),
-
- /// supports UDAF and other aggregate functions not implemented
- UDAF_FALLBACK_ENABLE("spark.auron.udafFallback.enable", true),
-
- // TypedImperativeAggregate one row mem use size
- SUGGESTED_UDAF_ROW_MEM_USAGE("spark.auron.suggested.udaf.memUsedSize", 64),
-
- /// number of udafs to trigger sort-based aggregation
- /// by default, all aggs containing udafs are converted to sort-based
-
UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG("spark.auron.udafFallback.num.udafs.trigger.sortAgg",
1),
-
- // TypedImperativeAggregate one row mem use size
-
UDAF_FALLBACK_ESTIM_ROW_SIZE("spark.auron.udafFallback.typedImperativeEstimatedRowSize",
256),
-
- /// enable trimming string inputs before casting to numeric/boolean types
- CAST_STRING_TRIM_ENABLE("spark.auron.cast.trimString", true),
-
- /// ignore corrupted input files
- IGNORE_CORRUPTED_FILES("spark.files.ignoreCorruptFiles", false),
-
- /// enable partial aggregate skipping (see
https://github.com/apache/auron/issues/327)
- PARTIAL_AGG_SKIPPING_ENABLE("spark.auron.partialAggSkipping.enable", true),
-
- /// partial aggregate skipping ratio
- PARTIAL_AGG_SKIPPING_RATIO("spark.auron.partialAggSkipping.ratio", 0.9),
-
- /// minimum number of rows to trigger partial aggregate skipping
- PARTIAL_AGG_SKIPPING_MIN_ROWS("spark.auron.partialAggSkipping.minRows",
BATCH_SIZE.intConf() * 5),
-
- /// always skip partial aggregate when triggered spilling
-
PARTIAL_AGG_SKIPPING_SKIP_SPILL("spark.auron.partialAggSkipping.skipSpill",
false),
-
- // parquet enable page filtering
- PARQUET_ENABLE_PAGE_FILTERING("spark.auron.parquet.enable.pageFiltering",
false),
-
- // parquet enable bloom filter
- PARQUET_ENABLE_BLOOM_FILTER("spark.auron.parquet.enable.bloomFilter",
false),
-
- // parquet max over read size
- PARQUET_MAX_OVER_READ_SIZE("spark.auron.parquet.maxOverReadSize", 16384),
-
- // parquet metadata cache size
- PARQUET_METADATA_CACHE_SIZE("spark.auron.parquet.metadataCacheSize", 5),
-
- // spark io compression codec
- SPARK_IO_COMPRESSION_CODEC("spark.io.compression.codec", "lz4"),
-
- // spark io compression zstd level
- SPARK_IO_COMPRESSION_ZSTD_LEVEL("spark.io.compression.zstd.level", 1),
-
- // tokio worker threads per cpu (spark.task.cpus), 0 for auto detection
- TOKIO_WORKER_THREADS_PER_CPU("spark.auron.tokio.worker.threads.per.cpu",
0),
-
- // number of cpus per task
- SPARK_TASK_CPUS("spark.task.cpus", 1),
-
- // replace all sort-merge join to shuffled-hash join, only used for
benchmarking
- FORCE_SHUFFLED_HASH_JOIN("spark.auron.forceShuffledHashJoin", false),
-
- // shuffle compression target buffer size, default is 4MB
-
SHUFFLE_COMPRESSION_TARGET_BUF_SIZE("spark.auron.shuffle.compression.targetBufSize",
4194304),
-
- // spark spill compression codec
- SPILL_COMPRESSION_CODEC("spark.auron.spill.compression.codec", "lz4"),
-
- // enable hash join falling back to sort merge join when hash table is too
big
- SMJ_FALLBACK_ENABLE("spark.auron.smjfallback.enable", false),
-
- // smj fallback threshold
- SMJ_FALLBACK_ROWS_THRESHOLD("spark.auron.smjfallback.rows.threshold",
10000000),
-
- // smj fallback threshold
- SMJ_FALLBACK_MEM_SIZE_THRESHOLD("spark.auron.smjfallback.mem.threshold",
134217728),
-
- // max memory fraction of on-heap spills
- ON_HEAP_SPILL_MEM_FRACTION("spark.auron.onHeapSpill.memoryFraction", 0.9),
-
- // suggested memory size for record batch
- SUGGESTED_BATCH_MEM_SIZE("spark.auron.suggested.batch.memSize", 8388608),
-
- // fallback to UDFJson when error parsing json in native implementation
- PARSE_JSON_ERROR_FALLBACK("spark.auron.parseJsonError.fallback", true),
-
- // suggested memory size for k-way merging
- // use smaller batch memory size for kway merging since there will be
multiple
- // batches in memory at the same time
-
SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE("spark.auron.suggested.batch.memSize.multiwayMerging",
1048576),
-
-
ORC_FORCE_POSITIONAL_EVOLUTION("spark.auron.orc.force.positional.evolution",
false),
-
- // use microsecond precision when reading ORC timestamp columns
- ORC_TIMESTAMP_USE_MICROSECOND("spark.auron.orc.timestamp.use.microsecond",
false),
-
- ORC_SCHEMA_CASE_SENSITIVE("spark.auron.orc.schema.caseSensitive.enable",
false),
-
- NATIVE_LOG_LEVEL("spark.auron.native.log.level", "info");
-
- public final String key;
- private final Object defaultValue;
-
- AuronConf(String key, Object defaultValue) {
- this.key = key;
- this.defaultValue = defaultValue;
- }
-
- public boolean booleanConf() {
- return conf().getBoolean(key, (boolean) defaultValue);
- }
-
- public int intConf() {
- return conf().getInt(key, (int) defaultValue);
- }
-
- public long longConf() {
- return conf().getLong(key, (long) defaultValue);
- }
-
- public double doubleConf() {
- return conf().getDouble(key, (double) defaultValue);
- }
-
- public String stringConf() {
- return conf().get(key, (String) defaultValue);
- }
-
- public static boolean booleanConf(String confName) {
- return AuronConf.valueOf(confName).booleanConf();
- }
-
- public static int intConf(String confName) {
- return AuronConf.valueOf(confName).intConf();
- }
-
- public static long longConf(String confName) {
- return AuronConf.valueOf(confName).longConf();
- }
-
- public static double doubleConf(String confName) {
- return AuronConf.valueOf(confName).doubleConf();
- }
-
- public static String stringConf(String confName) {
- return AuronConf.valueOf(confName).stringConf();
- }
-
- private static SparkConf conf() {
- return SparkEnv$.MODULE$.get().conf();
- }
-}
diff --git
a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
deleted file mode 100644
index 3bea6e17..00000000
--- a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.auron;
-
-import java.lang.management.BufferPoolMXBean;
-import java.lang.management.ManagementFactory;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.auron.functions.AuronUDFWrapperContext;
-import org.apache.auron.hadoop.fs.FSDataInputWrapper;
-import org.apache.auron.hadoop.fs.FSDataInputWrapper$;
-import org.apache.auron.hadoop.fs.FSDataOutputWrapper;
-import org.apache.auron.hadoop.fs.FSDataOutputWrapper$;
-import org.apache.auron.memory.OnHeapSpillManager;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.spark.SparkEnv;
-import org.apache.spark.TaskContext;
-import org.apache.spark.TaskContext$;
-import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager$;
-import org.apache.spark.sql.auron.util.TaskContextHelper$;
-
-/**
- * This class has been deprecated and migrated to {@link
org.apache.auron.jni.JniBridge}.
- * Will be removed in the future.
- */
-@Deprecated
-@SuppressWarnings("unused")
-public class JniBridge {
-
- @Deprecated
- public static final ConcurrentHashMap<String, Object> resourcesMap = new
ConcurrentHashMap<>();
-
- @Deprecated
- public static native long callNative(long initNativeMemory, String
logLevel, AuronCallNativeWrapper wrapper);
-
- @Deprecated
- public static native boolean nextBatch(long ptr);
-
- @Deprecated
- public static native void finalizeNative(long ptr);
-
- @Deprecated
- public static native void onExit();
-
- @Deprecated
- public static ClassLoader getContextClassLoader() {
- return Thread.currentThread().getContextClassLoader();
- }
-
- @Deprecated
- public static void setContextClassLoader(ClassLoader cl) {
- Thread.currentThread().setContextClassLoader(cl);
- }
-
- @Deprecated
- public static Object getResource(String key) {
- return resourcesMap.remove(key);
- }
-
- @Deprecated
- public static TaskContext getTaskContext() {
- return TaskContext$.MODULE$.get();
- }
-
- @Deprecated
- public static OnHeapSpillManager getTaskOnHeapSpillManager() {
- return SparkOnHeapSpillManager$.MODULE$.current();
- }
-
- @Deprecated
- public static boolean isTaskRunning() {
- TaskContext tc = getTaskContext();
- if (tc == null) { // driver is always running
- return true;
- }
- return !tc.isCompleted() && !tc.isInterrupted();
- }
-
- @Deprecated
- public static FSDataInputWrapper openFileAsDataInputWrapper(FileSystem fs,
String path) throws Exception {
- // the path is a URI string, so we need to convert it to a URI object,
ref:
- // org.apache.spark.paths.SparkPath.toPath
- return FSDataInputWrapper$.MODULE$.wrap(fs.open(new Path(new
URI(path))));
- }
-
- @Deprecated
- public static FSDataOutputWrapper createFileAsDataOutputWrapper(FileSystem
fs, String path) throws Exception {
- return FSDataOutputWrapper$.MODULE$.wrap(fs.create(new Path(new
URI(path))));
- }
-
- @Deprecated
- private static final List<BufferPoolMXBean> directMXBeans =
- ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
-
- @Deprecated
- public static long getTotalMemoryLimited() {
- return NativeHelper$.MODULE$.totalMemory();
- }
-
- @Deprecated
- public static long getDirectMemoryUsed() {
- return directMXBeans.stream()
- .mapToLong(BufferPoolMXBean::getTotalCapacity)
- .sum();
- }
-
- @Deprecated
- public static String getDirectWriteSpillToDiskFile() {
- return SparkEnv.get()
- .blockManager()
- .diskBlockManager()
- .createTempLocalBlock()
- ._2
- .getPath();
- }
-
- @Deprecated
- public static void initNativeThread(ClassLoader cl, TaskContext tc) {
- setContextClassLoader(cl);
- TaskContext$.MODULE$.setTaskContext(tc);
- TaskContextHelper$.MODULE$.setNativeThreadName();
- TaskContextHelper$.MODULE$.setHDFSCallerContext();
- }
-
- @Deprecated
- public static AuronUDFWrapperContext getAuronUDFWrapperContext(ByteBuffer
udfSerialized) {
- throw new UnsupportedOperationException("This API is designed to
support next-generation multi-engine.");
- }
-}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
deleted file mode 100644
index b4028c1e..00000000
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.auron
-
-import java.io.File
-import java.io.IOException
-import java.nio.file.Files
-import java.nio.file.StandardCopyOption
-import java.util.concurrent.atomic.AtomicReference
-
-import scala.annotation.nowarn
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.arrow.c.ArrowArray
-import org.apache.arrow.c.ArrowSchema
-import org.apache.arrow.c.CDataDictionaryProvider
-import org.apache.arrow.c.Data
-import org.apache.arrow.vector.VectorSchemaRoot
-import org.apache.arrow.vector.types.pojo.Schema
-import org.apache.spark.Partition
-import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.auron.util.Using
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
-import org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils
-import
org.apache.spark.sql.execution.auron.arrowio.util.ArrowUtils.ROOT_ALLOCATOR
-import org.apache.spark.sql.execution.auron.columnar.ColumnarHelper
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.CompletionIterator
-import org.apache.spark.util.ShutdownHookManager
-import org.apache.spark.util.Utils
-
-import org.apache.auron.metric.{MetricNode, SparkMetricNode}
-import org.apache.auron.protobuf.PartitionId
-import org.apache.auron.protobuf.PhysicalPlanNode
-import org.apache.auron.protobuf.TaskDefinition
-
-/**
- * This class has been deprecated and migrated to {@link
- * org.apache.auron.jni.AuronCallNativeWrapper}. Will be removed in the future.
- */
-@nowarn("cat=deprecation") // JniBridge is temporarily used (deprecated)
-@Deprecated
-case class AuronCallNativeWrapper(
- nativePlan: PhysicalPlanNode,
- partition: Partition,
- context: Option[TaskContext],
- metrics: SparkMetricNode)
- extends Logging {
-
- AuronCallNativeWrapper.initNative()
-
- private val error: AtomicReference[Throwable] = new AtomicReference(null)
- private val dictionaryProvider = new CDataDictionaryProvider()
- private var arrowSchema: Schema = _
- private var schema: StructType = _
- private var toUnsafe: UnsafeProjection = _
- private val batchRows: ArrayBuffer[InternalRow] = ArrayBuffer()
- private var batchCurRowIdx = 0
-
- logInfo(s"Start executing native plan ${nativePlan.getPhysicalPlanTypeCase}")
- private var nativeRuntimePtr =
- JniBridge.callNative(NativeHelper.nativeMemory,
AuronConf.NATIVE_LOG_LEVEL.stringConf(), this)
-
- private lazy val rowIterator = new Iterator[InternalRow] {
- override def hasNext: Boolean = {
- checkError()
-
- if (batchCurRowIdx < batchRows.length) {
- return true
- }
-
- // clear current batch
- batchRows.clear()
- batchCurRowIdx = 0
-
- // load next batch
- try {
- if (nativeRuntimePtr != 0 && JniBridge.nextBatch(nativeRuntimePtr)) {
- return hasNext
- }
- } finally {
- // if error has been set, throw set error instead of this caught
exception
- checkError()
- }
- false
- }
-
- @Deprecated
- override def next(): InternalRow = {
- val batchRow = batchRows(batchCurRowIdx)
- batchCurRowIdx += 1
- batchRow
- }
- }
-
- context.foreach(_.addTaskCompletionListener[Unit]((_: TaskContext) =>
close()))
- context.foreach(_.addTaskFailureListener((_, _) => close()))
-
- @Deprecated
- def getRowIterator: Iterator[InternalRow] = {
- CompletionIterator[InternalRow, Iterator[InternalRow]](rowIterator,
close())
- }
-
- @Deprecated
- protected def getMetrics: MetricNode =
- metrics
-
- @Deprecated
- protected def importSchema(ffiSchemaPtr: Long): Unit = {
- Using.resource(ArrowSchema.wrap(ffiSchemaPtr)) { ffiSchema =>
- arrowSchema = Data.importSchema(ROOT_ALLOCATOR, ffiSchema,
dictionaryProvider)
- schema = ArrowUtils.fromArrowSchema(arrowSchema)
- toUnsafe = UnsafeProjection.create(schema)
- }
- }
-
- @Deprecated
- protected def importBatch(ffiArrayPtr: Long): Unit = {
- if (nativeRuntimePtr == 0) {
- throw new RuntimeException("Native runtime is finalized")
- }
-
- Using.resources(
- ArrowArray.wrap(ffiArrayPtr),
- VectorSchemaRoot.create(arrowSchema, ROOT_ALLOCATOR)) { case (ffiArray,
root) =>
- Data.importIntoVectorSchemaRoot(ROOT_ALLOCATOR, ffiArray, root,
dictionaryProvider)
-
- batchRows.append(
- ColumnarHelper
- .rootRowsIter(root)
- .map(row => toUnsafe(row).copy().asInstanceOf[InternalRow])
- .toSeq: _*)
- }
- }
-
- @Deprecated
- protected def setError(error: Throwable): Unit = {
- this.error.set(error)
- }
-
- @Deprecated
- protected def checkError(): Unit = {
- val throwable = error.getAndSet(null)
- if (throwable != null) {
- close()
- throw throwable
- }
- }
-
- @Deprecated
- protected def getRawTaskDefinition: Array[Byte] = {
- val partitionId: PartitionId = PartitionId
- .newBuilder()
- .setPartitionId(partition.index)
- .setStageId(context.map(_.stageId()).getOrElse(0))
- .setTaskId(context.map(_.taskAttemptId()).getOrElse(0))
- .build()
-
- val taskDefinition = TaskDefinition
- .newBuilder()
- .setTaskId(partitionId)
- .setPlan(nativePlan)
- .build()
- taskDefinition.toByteArray
- }
-
- private def close(): Unit = {
- synchronized {
- batchRows.clear()
- batchCurRowIdx = 0
-
- if (nativeRuntimePtr != 0) {
- JniBridge.finalizeNative(nativeRuntimePtr)
- nativeRuntimePtr = 0
- dictionaryProvider.close()
- checkError()
- }
- }
- }
-}
-
-@nowarn("cat=deprecation") // JniBridge is temporarily used (deprecated)
-object AuronCallNativeWrapper extends Logging {
- def initNative(): Unit = {
- lazyInitNative
- }
-
- private lazy val lazyInitNative: Unit = {
- logInfo(
- "Initializing native environment (" +
- s"batchSize=${AuronConf.BATCH_SIZE.intConf()}, " +
- s"nativeMemory=${NativeHelper.nativeMemory}, " +
- s"memoryFraction=${AuronConf.MEMORY_FRACTION.doubleConf()})")
-
- // arrow configuration
- System.setProperty("arrow.struct.conflict.policy", "CONFLICT_APPEND")
-
- assert(classOf[JniBridge] != null) // preload JNI bridge classes
- AuronCallNativeWrapper.loadLibAuron()
- ShutdownHookManager.addShutdownHook(() => JniBridge.onExit())
- }
-
- private def loadLibAuron(): Unit = {
- val libName = System.mapLibraryName("auron")
- try {
- val classLoader = classOf[NativeSupports].getClassLoader
- val tempFile = File.createTempFile("libauron-", ".tmp")
- tempFile.deleteOnExit()
-
- Utils.tryWithResource {
- val is = classLoader.getResourceAsStream(libName)
- assert(is != null, s"cannot load $libName")
- is
- }(Files.copy(_, tempFile.toPath, StandardCopyOption.REPLACE_EXISTING))
- System.load(tempFile.getAbsolutePath)
-
- } catch {
- case e: IOException =>
- throw new IllegalStateException("error loading native libraries: " + e)
- }
- }
-}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index 9b8bed91..def645d5 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -80,58 +80,34 @@ import org.apache.auron.protobuf.PhysicalPlanNode
import org.apache.auron.spark.configuration.SparkAuronConfiguration
object AuronConverters extends Logging {
- def enableScan: Boolean =
- getBooleanConf("spark.auron.enable.scan", defaultValue = true)
- def enableProject: Boolean =
- getBooleanConf("spark.auron.enable.project", defaultValue = true)
- def enableFilter: Boolean =
- getBooleanConf("spark.auron.enable.filter", defaultValue = true)
- def enableSort: Boolean =
- getBooleanConf("spark.auron.enable.sort", defaultValue = true)
- def enableUnion: Boolean =
- getBooleanConf("spark.auron.enable.union", defaultValue = true)
- def enableSmj: Boolean =
- getBooleanConf("spark.auron.enable.smj", defaultValue = true)
- def enableShj: Boolean =
- getBooleanConf("spark.auron.enable.shj", defaultValue = true)
- def enableBhj: Boolean =
- getBooleanConf("spark.auron.enable.bhj", defaultValue = true)
- def enableBnlj: Boolean =
- getBooleanConf("spark.auron.enable.bnlj", defaultValue = true)
- def enableLocalLimit: Boolean =
- getBooleanConf("spark.auron.enable.local.limit", defaultValue = true)
- def enableGlobalLimit: Boolean =
- getBooleanConf("spark.auron.enable.global.limit", defaultValue = true)
+ def enableScan: Boolean = SparkAuronConfiguration.ENABLE_SCAN.get()
+ def enableProject: Boolean = SparkAuronConfiguration.ENABLE_PROJECT.get()
+ def enableFilter: Boolean = SparkAuronConfiguration.ENABLE_FILTER.get()
+ def enableSort: Boolean = SparkAuronConfiguration.ENABLE_SORT.get()
+ def enableUnion: Boolean = SparkAuronConfiguration.ENABLE_UNION.get()
+ def enableSmj: Boolean = SparkAuronConfiguration.ENABLE_SMJ.get()
+ def enableShj: Boolean = SparkAuronConfiguration.ENABLE_SHJ.get()
+ def enableBhj: Boolean = SparkAuronConfiguration.ENABLE_BHJ.get()
+ def enableBnlj: Boolean = SparkAuronConfiguration.ENABLE_BNLJ.get()
+ def enableLocalLimit: Boolean =
SparkAuronConfiguration.ENABLE_LOCAL_LIMIT.get()
+ def enableGlobalLimit: Boolean =
SparkAuronConfiguration.ENABLE_GLOBAL_LIMIT.get()
def enableTakeOrderedAndProject: Boolean =
- getBooleanConf("spark.auron.enable.take.ordered.and.project", defaultValue
= true)
- def enableCollectLimit: Boolean =
- getBooleanConf("spark.auron.enable.collectLimit", defaultValue = true)
- def enableAggr: Boolean =
- getBooleanConf("spark.auron.enable.aggr", defaultValue = true)
- def enableExpand: Boolean =
- getBooleanConf("spark.auron.enable.expand", defaultValue = true)
- def enableWindow: Boolean =
- getBooleanConf("spark.auron.enable.window", defaultValue = true)
- def enableWindowGroupLimit: Boolean =
- getBooleanConf("spark.auron.enable.window.group.limit", defaultValue =
true)
- def enableGenerate: Boolean =
- getBooleanConf("spark.auron.enable.generate", defaultValue = true)
- def enableLocalTableScan: Boolean =
- getBooleanConf("spark.auron.enable.local.table.scan", defaultValue = true)
- def enableDataWriting: Boolean =
- getBooleanConf("spark.auron.enable.data.writing", defaultValue = false)
- def enableScanParquet: Boolean =
- getBooleanConf("spark.auron.enable.scan.parquet", defaultValue = true)
+ SparkAuronConfiguration.ENABLE_TAKE_ORDERED_AND_PROJECT.get()
+ def enableCollectLimit: Boolean =
SparkAuronConfiguration.ENABLE_COLLECT_LIMIT.get()
+ def enableAggr: Boolean = SparkAuronConfiguration.ENABLE_AGGR.get()
+ def enableExpand: Boolean = SparkAuronConfiguration.ENABLE_EXPAND.get()
+ def enableWindow: Boolean = SparkAuronConfiguration.ENABLE_WINDOW.get()
+ def enableWindowGroupLimit: Boolean =
SparkAuronConfiguration.ENABLE_WINDOW_GROUP_LIMIT.get()
+ def enableGenerate: Boolean = SparkAuronConfiguration.ENABLE_GENERATE.get()
+ def enableLocalTableScan: Boolean =
SparkAuronConfiguration.ENABLE_LOCAL_TABLE_SCAN.get()
+ def enableDataWriting: Boolean =
SparkAuronConfiguration.ENABLE_DATA_WRITING.get()
+ def enableScanParquet: Boolean =
SparkAuronConfiguration.ENABLE_SCAN_PARQUET.get()
def enableScanParquetTimestamp: Boolean =
- getBooleanConf("spark.auron.enable.scan.parquet.timestamp", defaultValue =
true)
- def enableScanOrc: Boolean =
- getBooleanConf("spark.auron.enable.scan.orc", defaultValue = true)
- def enableScanOrcTimestamp: Boolean =
- getBooleanConf("spark.auron.enable.scan.orc.timestamp", defaultValue =
true)
- def enableBroadcastExchange: Boolean =
- getBooleanConf("spark.auron.enable.broadcastExchange", defaultValue = true)
- def enableShuffleExechange: Boolean =
- getBooleanConf("spark.auron.enable.shuffleExchange", defaultValue = true)
+ SparkAuronConfiguration.ENABLE_SCAN_PARQUET_TIMESTAMP.get()
+ def enableScanOrc: Boolean = SparkAuronConfiguration.ENABLE_SCAN_ORC.get()
+ def enableScanOrcTimestamp: Boolean =
SparkAuronConfiguration.ENABLE_SCAN_ORC_TIMESTAMP.get()
+ def enableBroadcastExchange: Boolean =
SparkAuronConfiguration.ENABLE_BROADCAST_EXCHANGE.get()
+ def enableShuffleExechange: Boolean =
SparkAuronConfiguration.ENABLE_SHUFFLE_EXCHANGE.get()
private val extConvertProviders =
ServiceLoader.load(classOf[AuronConvertProvider]).asScala
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
index 1f8b6421..b68b0495 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
@@ -18,7 +18,6 @@ package org.apache.spark.sql.auron
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.rules.Rule
@@ -27,6 +26,8 @@ import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+
class AuronSparkSessionExtension extends (SparkSessionExtensions => Unit) with
Logging {
Shims.get.initExtension()
@@ -35,7 +36,6 @@ class AuronSparkSessionExtension extends
(SparkSessionExtensions => Unit) with L
SparkEnv.get.conf.set(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key, "true")
logInfo(s"${classOf[AuronSparkSessionExtension].getName} enabled")
- assert(AuronSparkSessionExtension.auronEnabledKey != null)
Shims.get.onApplyingExtension()
extensions.injectColumnar(sparkSession => {
@@ -45,11 +45,6 @@ class AuronSparkSessionExtension extends
(SparkSessionExtensions => Unit) with L
}
object AuronSparkSessionExtension extends Logging {
- lazy val auronEnabledKey: ConfigEntry[Boolean] = SQLConf
- .buildConf("spark.auron.enable")
- .booleanConf
- .createWithDefault(true)
-
def dumpSimpleSparkPlanTreeNode(exec: SparkPlan, depth: Int = 0): Unit = {
val nodeName = exec.nodeName
val convertible = exec
@@ -68,7 +63,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession)
extends ColumnarRu
override def preColumnarTransitions: Rule[SparkPlan] = {
new Rule[SparkPlan] {
override def apply(sparkPlan: SparkPlan): SparkPlan = {
- if (!sparkPlan.conf.getConf(auronEnabledKey)) {
+ if (!SparkAuronConfiguration.AURON_ENABLED.get()) {
return sparkPlan // performs no conversion if auron is not enabled
}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
index 8b48c39a..7a3bde2c 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala
@@ -81,31 +81,17 @@ import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
import org.apache.auron.{protobuf => pb}
-import org.apache.auron.configuration.AuronConfiguration
-import org.apache.auron.jni.AuronAdaptor
import org.apache.auron.protobuf.PhysicalExprNode
import org.apache.auron.spark.configuration.SparkAuronConfiguration
object NativeConverters extends Logging {
-
- private def sparkAuronConfig: AuronConfiguration =
- AuronAdaptor.getInstance.getAuronConfiguration
- def udfEnabled: Boolean =
- AuronConverters.getBooleanConf("spark.auron.udf.enabled", defaultValue =
true)
- def udfJsonEnabled: Boolean =
- AuronConverters.getBooleanConf("spark.auron.udf.UDFJson.enabled",
defaultValue = true)
- def udfBrickHouseEnabled: Boolean =
- AuronConverters.getBooleanConf("spark.auron.udf.brickhouse.enabled",
defaultValue = true)
- def decimalArithOpEnabled: Boolean =
- AuronConverters.getBooleanConf("spark.auron.decimal.arithOp.enabled",
defaultValue = false)
- def datetimeExtractEnabled: Boolean =
- AuronConverters.getBooleanConf("spark.auron.datetime.extract.enabled",
defaultValue = false)
- def castTrimStringEnabled: Boolean =
- AuronConverters.getBooleanConf("spark.auron.cast.trimString", defaultValue
= true)
+ def udfJsonEnabled: Boolean = SparkAuronConfiguration.UDF_JSON_ENABLED.get()
+ def udfBrickHouseEnabled: Boolean =
SparkAuronConfiguration.UDF_BRICKHOUSE_ENABLED.get()
+ def decimalArithOpEnabled: Boolean =
SparkAuronConfiguration.DECIMAL_ARITH_OP_ENABLED.get()
+ def datetimeExtractEnabled: Boolean =
SparkAuronConfiguration.DATETIME_EXTRACT_ENABLED.get()
+ def castTrimStringEnabled: Boolean =
SparkAuronConfiguration.CAST_STRING_TRIM_ENABLE.get()
def singleChildFallbackEnabled: Boolean =
- AuronConverters.getBooleanConf(
- "spark.auron.expression.singleChildFallback.enabled",
- defaultValue = true)
+ SparkAuronConfiguration.UDF_SINGLE_CHILD_FALLBACK_ENABLED.get()
/**
* Is the data type(scalar or complex) supported by Auron.
@@ -781,7 +767,7 @@ object NativeConverters extends Logging {
// if rhs is complex in and/or operators, use short-circuiting
implementation
// or if forceShortCircuitAndOr is enabled, always use short-circuiting
case And(lhs, rhs)
- if
sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHORT_CIRCUIT_AND_OR)
+ if SparkAuronConfiguration.FORCE_SHORT_CIRCUIT_AND_OR.get()
|| rhs.find(HiveUDFUtil.isHiveUDF).isDefined =>
buildExprNode {
_.setScAndExpr(
@@ -791,7 +777,7 @@ object NativeConverters extends Logging {
.setRight(convertExprWithFallback(rhs, isPruningExpr, fallback)))
}
case Or(lhs, rhs)
- if
sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHORT_CIRCUIT_AND_OR)
+ if SparkAuronConfiguration.FORCE_SHORT_CIRCUIT_AND_OR.get()
|| rhs.find(HiveUDFUtil.isHiveUDF).isDefined =>
buildExprNode {
_.setScOrExpr(
@@ -889,11 +875,9 @@ object NativeConverters extends Logging {
case Length(arg) if arg.dataType == StringType =>
buildScalarFunction(pb.ScalarFunction.CharacterLength, arg :: Nil,
IntegerType)
- case e: Lower
- if
sparkAuronConfig.getBoolean(SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE)
=>
+ case e: Lower if
SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE.get() =>
buildExtScalarFunction("Spark_StringLower", e.children, e.dataType)
- case e: Upper
- if
sparkAuronConfig.getBoolean(SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE)
=>
+ case e: Upper if
SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE.get() =>
buildExtScalarFunction("Spark_StringUpper", e.children, e.dataType)
case e: StringTrim =>
@@ -1254,7 +1238,7 @@ object NativeConverters extends Logging {
}
// fallback to UDAF
- if
(sparkAuronConfig.getBoolean(SparkAuronConfiguration.UDAF_FALLBACK_ENABLE)) {
+ if (SparkAuronConfiguration.UDAF_FALLBACK_ENABLE.get()) {
udaf match {
case _: DeclarativeAggregate =>
case _: TypedImperativeAggregate[_] =>
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
index 8d817521..19f98b41 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
@@ -292,6 +292,7 @@ object Shims {
lazy val get: Shims = {
classOf[Shims].getClassLoader
.loadClass("org.apache.spark.sql.auron.ShimsImpl")
+ .getConstructor()
.newInstance()
.asInstanceOf[Shims]
}
diff --git
a/spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java
b/spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java
deleted file mode 100644
index 99345250..00000000
---
a/spark-extension/src/test/java/org/apache/auron/spark/configuration/SparkAuronConfigurationTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.auron.spark.configuration;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import org.apache.spark.SparkConf;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/**
- * This class is used to test the {@link SparkAuronConfiguration) class.
- */
-public class SparkAuronConfigurationTest {
-
- private SparkAuronConfiguration sparkAuronConfiguration;
-
- @BeforeEach
- public void setUp() {
- SparkConf sparkConf = new SparkConf();
- sparkConf.set("spark.auron.ui.enabled", "false");
- sparkConf.set("spark.auron.process.vmrss.memoryFraction", "0.66");
- sparkConf.set("spark.auron.suggested.udaf.memUsedSize", "1024");
- sparkConf.set("spark.io.compression.codec", "gzip");
- sparkAuronConfiguration = new SparkAuronConfiguration(sparkConf);
- }
-
- @Test
- public void testGetSparkConfig() {
-
assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.UI_ENABLED),
false);
-
assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.PROCESS_MEMORY_FRACTION),
0.66);
-
assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.SUGGESTED_UDAF_ROW_MEM_USAGE),
1024);
-
assertEquals(sparkAuronConfiguration.get(SparkAuronConfiguration.SPARK_IO_COMPRESSION_CODEC),
"gzip");
-
- assertEquals(
- sparkAuronConfiguration
- .getOptional(SparkAuronConfiguration.UI_ENABLED.key())
- .get(),
- false);
- assertEquals(
- sparkAuronConfiguration
-
.getOptional(SparkAuronConfiguration.PROCESS_MEMORY_FRACTION.key())
- .get(),
- 0.66);
- assertEquals(
- sparkAuronConfiguration
-
.getOptional(SparkAuronConfiguration.SUGGESTED_UDAF_ROW_MEM_USAGE.key())
- .get(),
- 1024);
- assertEquals(
- sparkAuronConfiguration
-
.getOptional(SparkAuronConfiguration.SPARK_IO_COMPRESSION_CODEC.key())
- .get(),
- "gzip");
-
- // Test default value
- assertEquals(
- sparkAuronConfiguration
-
.getOptional(SparkAuronConfiguration.PARSE_JSON_ERROR_FALLBACK)
- .get(),
- true);
- }
-}
diff --git
a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
index 5b320d18..f9d75c43 100644
---
a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
+++
b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
@@ -24,11 +24,11 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.execution.auron.plan.NativePaimonTableScanExec
+import org.apache.auron.spark.configuration.SparkAuronConfiguration
+
class PaimonConvertProvider extends AuronConvertProvider with Logging {
- override def isEnabled: Boolean = {
- AuronConverters.getBooleanConf("spark.auron.enable.paimon.scan",
defaultValue = false)
- }
+ override def isEnabled: Boolean =
SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get()
override def isSupported(exec: SparkPlan): Boolean = {
exec match {