This is an automated email from the ASF dual-hosted git repository.
beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b3c56cffdb6 [SPARK-45515][CORE][SQL][FOLLOWUP] Use enhanced switch
expressions to replace the regular switch statement
b3c56cffdb6 is described below
commit b3c56cffdb6f731a1c8677a6bc896be0144ac0fc
Author: Jiaan Geng <[email protected]>
AuthorDate: Thu Dec 7 13:50:44 2023 +0800
[SPARK-45515][CORE][SQL][FOLLOWUP] Use enhanced switch expressions to
replace the regular switch statement
### What changes were proposed in this pull request?
This PR follows up https://github.com/apache/spark/pull/43349.
This pr also does not include parts of the hive and hive-thriftserver
module.
### Why are the changes needed?
Please see https://github.com/apache/spark/pull/43349.
### Does this PR introduce _any_ user-facing change?
'No'.
### How was this patch tested?
GA
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #44183 from beliefer/SPARK-45515_followup.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Jiaan Geng <[email protected]>
---
.../org/apache/spark/network/util/DBProvider.java | 16 +--
.../apache/spark/network/RpcIntegrationSuite.java | 10 +-
.../java/org/apache/spark/network/StreamSuite.java | 19 ++-
.../shuffle/checksum/ShuffleChecksumHelper.java | 15 +-
.../apache/spark/unsafe/UnsafeAlignedOffset.java | 10 +-
.../apache/spark/launcher/SparkLauncherSuite.java | 13 +-
.../apache/spark/launcher/CommandBuilderUtils.java | 98 +++++++------
.../spark/launcher/SparkClassCommandBuilder.java | 45 +++---
.../spark/launcher/SparkSubmitCommandBuilder.java | 55 +++-----
.../spark/launcher/InProcessLauncherSuite.java | 43 +++---
.../sql/connector/util/V2ExpressionSQLBuilder.java | 154 +++++----------------
.../parquet/ParquetVectorUpdaterFactory.java | 39 +++---
.../parquet/VectorizedColumnReader.java | 50 +++----
.../parquet/VectorizedRleValuesReader.java | 89 +++++-------
14 files changed, 251 insertions(+), 405 deletions(-)
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java
b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java
index 1adb9cfe5d3..5a25bdda233 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java
@@ -38,17 +38,17 @@ public class DBProvider {
StoreVersion version,
ObjectMapper mapper) throws IOException {
if (dbFile != null) {
- switch (dbBackend) {
- case LEVELDB:
+ return switch (dbBackend) {
+ case LEVELDB -> {
org.iq80.leveldb.DB levelDB = LevelDBProvider.initLevelDB(dbFile,
version, mapper);
logger.warn("The LEVELDB is deprecated. Please use ROCKSDB
instead.");
- return levelDB != null ? new LevelDB(levelDB) : null;
- case ROCKSDB:
+ yield levelDB != null ? new LevelDB(levelDB) : null;
+ }
+ case ROCKSDB -> {
org.rocksdb.RocksDB rocksDB = RocksDBProvider.initRockDB(dbFile,
version, mapper);
- return rocksDB != null ? new RocksDB(rocksDB) : null;
- default:
- throw new IllegalArgumentException("Unsupported DBBackend: " +
dbBackend);
- }
+ yield rocksDB != null ? new RocksDB(rocksDB) : null;
+ }
+ };
}
return null;
}
diff --git
a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
index 55a0cc73f8b..40495d6912c 100644
---
a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
+++
b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
@@ -67,14 +67,10 @@ public class RpcIntegrationSuite {
String msg = JavaUtils.bytesToString(message);
String[] parts = msg.split("/");
switch (parts[0]) {
- case "hello":
+ case "hello" ->
callback.onSuccess(JavaUtils.stringToBytes("Hello, " + parts[1] +
"!"));
- break;
- case "return error":
- callback.onFailure(new RuntimeException("Returned: " + parts[1]));
- break;
- case "throw error":
- throw new RuntimeException("Thrown: " + parts[1]);
+ case "return error" -> callback.onFailure(new
RuntimeException("Returned: " + parts[1]));
+ case "throw error" -> throw new RuntimeException("Thrown: " +
parts[1]);
}
}
diff --git
a/common/network-common/src/test/java/org/apache/spark/network/StreamSuite.java
b/common/network-common/src/test/java/org/apache/spark/network/StreamSuite.java
index 096dfc8b1cf..4f4637e302b 100644
---
a/common/network-common/src/test/java/org/apache/spark/network/StreamSuite.java
+++
b/common/network-common/src/test/java/org/apache/spark/network/StreamSuite.java
@@ -185,27 +185,26 @@ public class StreamSuite {
ByteArrayOutputStream baos = null;
switch (streamId) {
- case "largeBuffer":
+ case "largeBuffer" -> {
baos = new ByteArrayOutputStream();
out = baos;
srcBuffer = testData.largeBuffer;
- break;
- case "smallBuffer":
+ }
+ case "smallBuffer" -> {
baos = new ByteArrayOutputStream();
out = baos;
srcBuffer = testData.smallBuffer;
- break;
- case "file":
+ }
+ case "file" -> {
outFile = File.createTempFile("data", ".tmp", testData.tempDir);
out = new FileOutputStream(outFile);
- break;
- case "emptyBuffer":
+ }
+ case "emptyBuffer" -> {
baos = new ByteArrayOutputStream();
out = baos;
srcBuffer = testData.emptyBuffer;
- break;
- default:
- throw new IllegalArgumentException(streamId);
+ }
+ default -> throw new IllegalArgumentException(streamId);
}
TestCallback callback = new TestCallback(out);
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
index 6993be4c430..1feac49752c 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
@@ -50,24 +50,25 @@ public class ShuffleChecksumHelper {
private static Checksum[] getChecksumsByAlgorithm(int num, String algorithm)
{
Checksum[] checksums;
switch (algorithm) {
- case "ADLER32":
+ case "ADLER32" -> {
checksums = new Adler32[num];
for (int i = 0; i < num; i++) {
checksums[i] = new Adler32();
}
- return checksums;
+ }
- case "CRC32":
+ case "CRC32" -> {
checksums = new CRC32[num];
for (int i = 0; i < num; i++) {
checksums[i] = new CRC32();
}
- return checksums;
+ }
- default:
- throw new UnsupportedOperationException(
- "Unsupported shuffle checksum algorithm: " + algorithm);
+ default -> throw new UnsupportedOperationException(
+ "Unsupported shuffle checksum algorithm: " + algorithm);
}
+
+ return checksums;
}
public static Checksum getChecksumByAlgorithm(String algorithm) {
diff --git
a/common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java
b/common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java
index 2dfd1ecb635..fcb3ab9e6c1 100644
---
a/common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java
+++
b/common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java
@@ -53,13 +53,9 @@ public class UnsafeAlignedOffset {
public static void putSize(Object object, long offset, int value) {
switch (getUaoSize()) {
- case 4:
- Platform.putInt(object, offset, value);
- break;
- case 8:
- Platform.putLong(object, offset, value);
- break;
- default:
+ case 4 -> Platform.putInt(object, offset, value);
+ case 8 -> Platform.putLong(object, offset, value);
+ default ->
// checkstyle.off: RegexpSinglelineJava
throw new AssertionError("Illegal UAO_SIZE");
// checkstyle.on: RegexpSinglelineJava
diff --git
a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index cb132f1050d..f0da1eb10f8 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -187,16 +187,9 @@ public class SparkLauncherSuite extends BaseSuite {
InProcessLauncher launcher = new InProcessLauncher()
.setAppResource(SparkLauncher.NO_RESOURCE);
switch (args.length) {
- case 2:
- launcher.addSparkArg(args[0], args[1]);
- break;
-
- case 1:
- launcher.addSparkArg(args[0]);
- break;
-
- default:
- fail("FIXME: invalid test.");
+ case 2 -> launcher.addSparkArg(args[0], args[1]);
+ case 1 -> launcher.addSparkArg(args[0]);
+ default -> fail("FIXME: invalid test.");
}
SparkAppHandle handle = launcher.startApplication();
diff --git
a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
index 172fb8c5608..eb57d43c853 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
@@ -143,59 +143,61 @@ class CommandBuilderUtils {
escapeNext = false;
} else if (inOpt) {
switch (c) {
- case '\\':
- if (inSingleQuote) {
- opt.appendCodePoint(c);
- } else {
- escapeNext = true;
+ case '\\' -> {
+ if (inSingleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ escapeNext = true;
+ }
}
- break;
- case '\'':
- if (inDoubleQuote) {
- opt.appendCodePoint(c);
- } else {
- inSingleQuote = !inSingleQuote;
+ case '\'' -> {
+ if (inDoubleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ inSingleQuote = !inSingleQuote;
+ }
}
- break;
- case '"':
- if (inSingleQuote) {
- opt.appendCodePoint(c);
- } else {
- inDoubleQuote = !inDoubleQuote;
+ case '"' -> {
+ if (inSingleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ inDoubleQuote = !inDoubleQuote;
+ }
}
- break;
- default:
- if (!Character.isWhitespace(c) || inSingleQuote || inDoubleQuote) {
- opt.appendCodePoint(c);
- } else {
- opts.add(opt.toString());
- opt.setLength(0);
- inOpt = false;
- hasData = false;
+ default -> {
+ if (!Character.isWhitespace(c) || inSingleQuote || inDoubleQuote) {
+ opt.appendCodePoint(c);
+ } else {
+ opts.add(opt.toString());
+ opt.setLength(0);
+ inOpt = false;
+ hasData = false;
+ }
}
}
} else {
switch (c) {
- case '\'':
- inSingleQuote = true;
- inOpt = true;
- hasData = true;
- break;
- case '"':
- inDoubleQuote = true;
- inOpt = true;
- hasData = true;
- break;
- case '\\':
- escapeNext = true;
- inOpt = true;
- hasData = true;
- break;
- default:
- if (!Character.isWhitespace(c)) {
+ case '\'' -> {
+ inSingleQuote = true;
+ inOpt = true;
+ hasData = true;
+ }
+ case '"' -> {
+ inDoubleQuote = true;
+ inOpt = true;
+ hasData = true;
+ }
+ case '\\' -> {
+ escapeNext = true;
inOpt = true;
hasData = true;
- opt.appendCodePoint(c);
+ }
+ default -> {
+ if (!Character.isWhitespace(c)) {
+ inOpt = true;
+ hasData = true;
+ opt.appendCodePoint(c);
+ }
}
}
}
@@ -256,12 +258,8 @@ class CommandBuilderUtils {
for (int i = 0; i < arg.length(); i++) {
int cp = arg.codePointAt(i);
switch (cp) {
- case '"':
- quoted.append('"');
- break;
-
- default:
- break;
+ case '"' -> quoted.append('"');
+ default -> {}
}
quoted.appendCodePoint(cp);
}
diff --git
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index 4ea9cd5a44d..8d95bc06d7a 100644
---
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -44,49 +44,46 @@ class SparkClassCommandBuilder extends
AbstractCommandBuilder {
public List<String> buildCommand(Map<String, String> env)
throws IOException, IllegalArgumentException {
List<String> javaOptsKeys = new ArrayList<>();
- String memKey = null;
String extraClassPath = null;
// Master, Worker, HistoryServer, ExternalShuffleService use
// SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
- switch (className) {
- case "org.apache.spark.deploy.master.Master":
+ String memKey = switch (className) {
+ case "org.apache.spark.deploy.master.Master" -> {
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
javaOptsKeys.add("SPARK_MASTER_OPTS");
extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
- memKey = "SPARK_DAEMON_MEMORY";
- break;
- case "org.apache.spark.deploy.worker.Worker":
+ yield "SPARK_DAEMON_MEMORY";
+ }
+ case "org.apache.spark.deploy.worker.Worker" -> {
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
javaOptsKeys.add("SPARK_WORKER_OPTS");
extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
- memKey = "SPARK_DAEMON_MEMORY";
- break;
- case "org.apache.spark.deploy.history.HistoryServer":
+ yield "SPARK_DAEMON_MEMORY";
+ }
+ case "org.apache.spark.deploy.history.HistoryServer" -> {
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
javaOptsKeys.add("SPARK_HISTORY_OPTS");
extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
- memKey = "SPARK_DAEMON_MEMORY";
- break;
- case "org.apache.spark.executor.CoarseGrainedExecutorBackend":
+ yield "SPARK_DAEMON_MEMORY";
+ }
+ case "org.apache.spark.executor.CoarseGrainedExecutorBackend" -> {
javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
- memKey = "SPARK_EXECUTOR_MEMORY";
extraClassPath = getenv("SPARK_EXECUTOR_CLASSPATH");
- break;
- case "org.apache.spark.deploy.ExternalShuffleService":
+ yield "SPARK_EXECUTOR_MEMORY";
+ }
+ case "org.apache.spark.deploy.ExternalShuffleService" -> {
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
- memKey = "SPARK_DAEMON_MEMORY";
- break;
- case "org.apache.hive.beeline.BeeLine":
+ yield "SPARK_DAEMON_MEMORY";
+ }
+ case "org.apache.hive.beeline.BeeLine" -> {
javaOptsKeys.add("SPARK_BEELINE_OPTS");
- memKey = "SPARK_BEELINE_MEMORY";
- break;
- default:
- memKey = "SPARK_DRIVER_MEMORY";
- break;
- }
+ yield "SPARK_BEELINE_MEMORY";
+ }
+ default -> "SPARK_DRIVER_MEMORY";
+ };
List<String> cmd = buildJavaCommand(extraClassPath);
diff --git
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index 6c103d3cdf9..eb2fb1515cb 100644
---
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -490,37 +490,21 @@ class SparkSubmitCommandBuilder extends
AbstractCommandBuilder {
@Override
protected boolean handle(String opt, String value) {
switch (opt) {
- case MASTER:
- master = value;
- break;
- case REMOTE:
- remote = value;
- break;
- case DEPLOY_MODE:
- deployMode = value;
- break;
- case PROPERTIES_FILE:
- propertiesFile = value;
- break;
- case DRIVER_MEMORY:
- conf.put(SparkLauncher.DRIVER_MEMORY, value);
- break;
- case DRIVER_JAVA_OPTIONS:
- conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
- break;
- case DRIVER_LIBRARY_PATH:
- conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
- break;
- case DRIVER_CLASS_PATH:
- conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
- break;
- case CONF:
+ case MASTER -> master = value;
+ case REMOTE -> remote = value;
+ case DEPLOY_MODE -> deployMode = value;
+ case PROPERTIES_FILE -> propertiesFile = value;
+ case DRIVER_MEMORY -> conf.put(SparkLauncher.DRIVER_MEMORY, value);
+ case DRIVER_JAVA_OPTIONS ->
conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
+ case DRIVER_LIBRARY_PATH ->
conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
+ case DRIVER_CLASS_PATH ->
conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
+ case CONF -> {
checkArgument(value != null, "Missing argument to %s", CONF);
String[] setConf = value.split("=", 2);
checkArgument(setConf.length == 2, "Invalid argument to %s: %s",
CONF, value);
conf.put(setConf[0], setConf[1]);
- break;
- case CLASS:
+ }
+ case CLASS -> {
// The special classes require some special command line handling,
since they allow
// mixing spark-submit arguments with arguments that should be
propagated to the shell
// itself. Note that for this to work, the "--class" argument must
come before any
@@ -530,25 +514,22 @@ class SparkSubmitCommandBuilder extends
AbstractCommandBuilder {
allowsMixedArguments = true;
appResource = specialClasses.get(value);
}
- break;
- case KILL_SUBMISSION:
- case STATUS:
+ }
+ case KILL_SUBMISSION, STATUS -> {
isSpecialCommand = true;
parsedArgs.add(opt);
parsedArgs.add(value);
- break;
- case HELP:
- case USAGE_ERROR:
- case VERSION:
+ }
+ case HELP, USAGE_ERROR, VERSION -> {
isSpecialCommand = true;
parsedArgs.add(opt);
- break;
- default:
+ }
+ default -> {
parsedArgs.add(opt);
if (value != null) {
parsedArgs.add(value);
}
- break;
+ }
}
return true;
}
diff --git
a/launcher/src/test/java/org/apache/spark/launcher/InProcessLauncherSuite.java
b/launcher/src/test/java/org/apache/spark/launcher/InProcessLauncherSuite.java
index d901255fb13..3a29e7c6df8 100644
---
a/launcher/src/test/java/org/apache/spark/launcher/InProcessLauncherSuite.java
+++
b/launcher/src/test/java/org/apache/spark/launcher/InProcessLauncherSuite.java
@@ -94,16 +94,11 @@ public class InProcessLauncherSuite extends BaseSuite {
if (opt == CONF) {
String[] conf = value.split("=");
switch(conf[0]) {
- case LauncherProtocol.CONF_LAUNCHER_PORT:
- port.set(conf[1]);
- break;
-
- case LauncherProtocol.CONF_LAUNCHER_SECRET:
- secret.set(conf[1]);
- break;
-
- default:
+ case LauncherProtocol.CONF_LAUNCHER_PORT -> port.set(conf[1]);
+ case LauncherProtocol.CONF_LAUNCHER_SECRET ->
secret.set(conf[1]);
+ default -> {
// no op
+ }
}
}
@@ -128,25 +123,19 @@ public class InProcessLauncherSuite extends BaseSuite {
String test = args[args.length - 1];
switch (test) {
- case TEST_SUCCESS:
- break;
-
- case TEST_FAILURE:
- throw new IllegalStateException(TEST_FAILURE_MESSAGE);
-
- case TEST_KILL:
- try {
- // Wait for a reasonable amount of time to avoid the test hanging
forever on failure,
- // but still allowing for time outs to hopefully not occur on busy
machines.
- Thread.sleep(10000);
- fail("Did not get expected interrupt after 10s.");
- } catch (InterruptedException ie) {
- // Expected.
+ case TEST_SUCCESS -> {}
+ case TEST_FAILURE -> throw new
IllegalStateException(TEST_FAILURE_MESSAGE);
+ case TEST_KILL -> {
+ try {
+ // Wait for a reasonable amount of time to avoid the test hanging
forever on failure,
+ // but still allowing for time outs to hopefully not occur on busy
machines.
+ Thread.sleep(10000);
+ fail("Did not get expected interrupt after 10s.");
+ } catch (InterruptedException ie) {
+ // Expected.
+ }
}
- break;
-
- default:
- fail("Unknown test " + test);
+ default -> fail("Unknown test " + test);
}
} catch (Throwable t) {
lastError = t;
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
index 30ca02a36f1..5cd28f1c259 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
@@ -60,17 +60,10 @@ public class V2ExpressionSQLBuilder {
for (char c : str.toCharArray()) {
switch (c) {
- case '_':
- builder.append("\\_");
- break;
- case '%':
- builder.append("\\%");
- break;
- case '\'':
- builder.append("\\\'");
- break;
- default:
- builder.append(c);
+ case '_' -> builder.append("\\_");
+ case '%' -> builder.append("\\%");
+ case '\'' -> builder.append("\\\'");
+ default -> builder.append(c);
}
}
@@ -91,121 +84,48 @@ public class V2ExpressionSQLBuilder {
build(sortOrder.expression()), sortOrder.direction(),
sortOrder.nullOrdering());
} else if (expr instanceof GeneralScalarExpression e) {
String name = e.name();
- switch (name) {
- case "IN": {
+ return switch (name) {
+ case "IN" -> {
Expression[] expressions = e.children();
List<String> children = expressionsToStringList(expressions, 1,
expressions.length - 1);
- return visitIn(build(expressions[0]), children);
+ yield visitIn(build(expressions[0]), children);
}
- case "IS_NULL":
- return visitIsNull(build(e.children()[0]));
- case "IS_NOT_NULL":
- return visitIsNotNull(build(e.children()[0]));
- case "STARTS_WITH":
- return visitStartsWith(build(e.children()[0]),
build(e.children()[1]));
- case "ENDS_WITH":
- return visitEndsWith(build(e.children()[0]), build(e.children()[1]));
- case "CONTAINS":
- return visitContains(build(e.children()[0]), build(e.children()[1]));
- case "=":
- case "<>":
- case "<=>":
- case "<":
- case "<=":
- case ">":
- case ">=":
- return visitBinaryComparison(
- name, inputToSQL(e.children()[0]), inputToSQL(e.children()[1]));
- case "+":
- case "*":
- case "/":
- case "%":
- case "&":
- case "|":
- case "^":
- return visitBinaryArithmetic(
- name, inputToSQL(e.children()[0]), inputToSQL(e.children()[1]));
- case "-":
+ case "IS_NULL" -> visitIsNull(build(e.children()[0]));
+ case "IS_NOT_NULL" -> visitIsNotNull(build(e.children()[0]));
+ case "STARTS_WITH" -> visitStartsWith(build(e.children()[0]),
build(e.children()[1]));
+ case "ENDS_WITH" -> visitEndsWith(build(e.children()[0]),
build(e.children()[1]));
+ case "CONTAINS" -> visitContains(build(e.children()[0]),
build(e.children()[1]));
+ case "=", "<>", "<=>", "<", "<=", ">", ">=" ->
+ visitBinaryComparison(name, inputToSQL(e.children()[0]),
inputToSQL(e.children()[1]));
+ case "+", "*", "/", "%", "&", "|", "^" ->
+ visitBinaryArithmetic(name, inputToSQL(e.children()[0]),
inputToSQL(e.children()[1]));
+ case "-" -> {
if (e.children().length == 1) {
- return visitUnaryArithmetic(name, inputToSQL(e.children()[0]));
+ yield visitUnaryArithmetic(name, inputToSQL(e.children()[0]));
} else {
- return visitBinaryArithmetic(
+ yield visitBinaryArithmetic(
name, inputToSQL(e.children()[0]), inputToSQL(e.children()[1]));
}
- case "AND":
- return visitAnd(name, build(e.children()[0]),
build(e.children()[1]));
- case "OR":
- return visitOr(name, build(e.children()[0]), build(e.children()[1]));
- case "NOT":
- return visitNot(build(e.children()[0]));
- case "~":
- return visitUnaryArithmetic(name, inputToSQL(e.children()[0]));
- case "ABS":
- case "COALESCE":
- case "GREATEST":
- case "LEAST":
- case "RAND":
- case "LOG":
- case "LOG10":
- case "LOG2":
- case "LN":
- case "EXP":
- case "POWER":
- case "SQRT":
- case "FLOOR":
- case "CEIL":
- case "ROUND":
- case "SIN":
- case "SINH":
- case "COS":
- case "COSH":
- case "TAN":
- case "TANH":
- case "COT":
- case "ASIN":
- case "ASINH":
- case "ACOS":
- case "ACOSH":
- case "ATAN":
- case "ATANH":
- case "ATAN2":
- case "CBRT":
- case "DEGREES":
- case "RADIANS":
- case "SIGN":
- case "WIDTH_BUCKET":
- case "SUBSTRING":
- case "UPPER":
- case "LOWER":
- case "TRANSLATE":
- case "DATE_ADD":
- case "DATE_DIFF":
- case "TRUNC":
- case "AES_ENCRYPT":
- case "AES_DECRYPT":
- case "SHA1":
- case "SHA2":
- case "MD5":
- case "CRC32":
- case "BIT_LENGTH":
- case "CHAR_LENGTH":
- case "CONCAT":
- return visitSQLFunction(name,
expressionsToStringArray(e.children()));
- case "CASE_WHEN": {
- return visitCaseWhen(expressionsToStringArray(e.children()));
}
- case "TRIM":
- return visitTrim("BOTH", expressionsToStringArray(e.children()));
- case "LTRIM":
- return visitTrim("LEADING", expressionsToStringArray(e.children()));
- case "RTRIM":
- return visitTrim("TRAILING", expressionsToStringArray(e.children()));
- case "OVERLAY":
- return visitOverlay(expressionsToStringArray(e.children()));
+ case "AND" -> visitAnd(name, build(e.children()[0]),
build(e.children()[1]));
+ case "OR" -> visitOr(name, build(e.children()[0]),
build(e.children()[1]));
+ case "NOT" -> visitNot(build(e.children()[0]));
+ case "~" -> visitUnaryArithmetic(name, inputToSQL(e.children()[0]));
+ case "ABS", "COALESCE", "GREATEST", "LEAST", "RAND", "LOG", "LOG10",
"LOG2", "LN", "EXP",
+ "POWER", "SQRT", "FLOOR", "CEIL", "ROUND", "SIN", "SINH", "COS",
"COSH", "TAN", "TANH",
+ "COT", "ASIN", "ASINH", "ACOS", "ACOSH", "ATAN", "ATANH", "ATAN2",
"CBRT", "DEGREES",
+ "RADIANS", "SIGN", "WIDTH_BUCKET", "SUBSTRING", "UPPER", "LOWER",
"TRANSLATE",
+ "DATE_ADD", "DATE_DIFF", "TRUNC", "AES_ENCRYPT", "AES_DECRYPT",
"SHA1", "SHA2", "MD5",
+ "CRC32", "BIT_LENGTH", "CHAR_LENGTH", "CONCAT" ->
+ visitSQLFunction(name, expressionsToStringArray(e.children()));
+ case "CASE_WHEN" ->
visitCaseWhen(expressionsToStringArray(e.children()));
+ case "TRIM" -> visitTrim("BOTH",
expressionsToStringArray(e.children()));
+ case "LTRIM" -> visitTrim("LEADING",
expressionsToStringArray(e.children()));
+ case "RTRIM" -> visitTrim("TRAILING",
expressionsToStringArray(e.children()));
+ case "OVERLAY" -> visitOverlay(expressionsToStringArray(e.children()));
// TODO supports other expressions
- default:
- return visitUnexpectedExpr(expr);
- }
+ default -> visitUnexpectedExpr(expr);
+ };
} else if (expr instanceof Min min) {
return visitAggregateFunction("MIN", false,
expressionsToStringArray(min.children()));
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 54b96bfb873..1ed6c4329eb 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -68,12 +68,12 @@ public class ParquetVectorUpdaterFactory {
PrimitiveType.PrimitiveTypeName typeName =
descriptor.getPrimitiveType().getPrimitiveTypeName();
switch (typeName) {
- case BOOLEAN:
+ case BOOLEAN -> {
if (sparkType == DataTypes.BooleanType) {
return new BooleanUpdater();
}
- break;
- case INT32:
+ }
+ case INT32 -> {
if (sparkType == DataTypes.IntegerType ||
canReadAsIntDecimal(descriptor, sparkType)) {
return new IntegerUpdater();
} else if (sparkType == DataTypes.LongType &&
isUnsignedIntTypeMatched(32)) {
@@ -95,8 +95,8 @@ public class ParquetVectorUpdaterFactory {
} else if (sparkType instanceof YearMonthIntervalType) {
return new IntegerUpdater();
}
- break;
- case INT64:
+ }
+ case INT64 -> {
// This is where we implement support for the valid type conversions.
if (sparkType == DataTypes.LongType ||
canReadAsLongDecimal(descriptor, sparkType)) {
if (DecimalType.is32BitDecimalType(sparkType)) {
@@ -110,7 +110,7 @@ public class ParquetVectorUpdaterFactory {
// fallbacks. We read them as decimal values.
return new UnsignedLongUpdater();
} else if (isTimestamp(sparkType) &&
- isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
+ isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
validateTimestampType(sparkType);
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new LongUpdater();
@@ -119,7 +119,7 @@ public class ParquetVectorUpdaterFactory {
return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz);
}
} else if (isTimestamp(sparkType) &&
- isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
+ isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
validateTimestampType(sparkType);
if ("CORRECTED".equals(datetimeRebaseMode)) {
return new LongAsMicrosUpdater();
@@ -130,18 +130,18 @@ public class ParquetVectorUpdaterFactory {
} else if (sparkType instanceof DayTimeIntervalType) {
return new LongUpdater();
}
- break;
- case FLOAT:
+ }
+ case FLOAT -> {
if (sparkType == DataTypes.FloatType) {
return new FloatUpdater();
}
- break;
- case DOUBLE:
+ }
+ case DOUBLE -> {
if (sparkType == DataTypes.DoubleType) {
return new DoubleUpdater();
}
- break;
- case INT96:
+ }
+ case INT96 -> {
if (sparkType == DataTypes.TimestampNTZType) {
convertErrorForTimestampNTZ(typeName.name());
} else if (sparkType == DataTypes.TimestampType) {
@@ -163,14 +163,14 @@ public class ParquetVectorUpdaterFactory {
}
}
}
- break;
- case BINARY:
+ }
+ case BINARY -> {
if (sparkType == DataTypes.StringType || sparkType ==
DataTypes.BinaryType ||
canReadAsBinaryDecimal(descriptor, sparkType)) {
return new BinaryUpdater();
}
- break;
- case FIXED_LEN_BYTE_ARRAY:
+ }
+ case FIXED_LEN_BYTE_ARRAY -> {
int arrayLen = descriptor.getPrimitiveType().getTypeLength();
if (canReadAsIntDecimal(descriptor, sparkType)) {
return new FixedLenByteArrayAsIntUpdater(arrayLen);
@@ -181,9 +181,8 @@ public class ParquetVectorUpdaterFactory {
} else if (sparkType == DataTypes.BinaryType) {
return new FixedLenByteArrayUpdater(arrayLen);
}
- break;
- default:
- break;
+ }
+ default -> {}
}
// If we get here, it means the combination of Spark and Parquet type is
invalid or not
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index f185b251ed9..04fbe716ad9 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -141,26 +141,20 @@ public class VectorizedColumnReader {
}
private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName
typeName) {
- boolean isSupported = false;
- switch (typeName) {
- case INT32:
- isSupported = !(logicalTypeAnnotation instanceof
DateLogicalTypeAnnotation) ||
+ return switch (typeName) {
+ case INT32 ->
+ !(logicalTypeAnnotation instanceof DateLogicalTypeAnnotation) ||
"CORRECTED".equals(datetimeRebaseMode);
- break;
- case INT64:
+ case INT64 -> {
if (updaterFactory.isTimestampTypeMatched(TimeUnit.MICROS)) {
- isSupported = "CORRECTED".equals(datetimeRebaseMode);
+ yield "CORRECTED".equals(datetimeRebaseMode);
} else {
- isSupported =
!updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS);
+ yield !updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS);
}
- break;
- case FLOAT:
- case DOUBLE:
- case BINARY:
- isSupported = true;
- break;
- }
- return isSupported;
+ }
+ case FLOAT, DOUBLE, BINARY -> true;
+ default -> false;
+ };
}
/**
@@ -325,28 +319,24 @@ public class VectorizedColumnReader {
}
private ValuesReader getValuesReader(Encoding encoding) {
- switch (encoding) {
- case PLAIN:
- return new VectorizedPlainValuesReader();
- case DELTA_BYTE_ARRAY:
- return new VectorizedDeltaByteArrayReader();
- case DELTA_LENGTH_BYTE_ARRAY:
- return new VectorizedDeltaLengthByteArrayReader();
- case DELTA_BINARY_PACKED:
- return new VectorizedDeltaBinaryPackedReader();
- case RLE:
+ return switch (encoding) {
+ case PLAIN -> new VectorizedPlainValuesReader();
+ case DELTA_BYTE_ARRAY -> new VectorizedDeltaByteArrayReader();
+ case DELTA_LENGTH_BYTE_ARRAY -> new
VectorizedDeltaLengthByteArrayReader();
+ case DELTA_BINARY_PACKED -> new VectorizedDeltaBinaryPackedReader();
+ case RLE -> {
PrimitiveType.PrimitiveTypeName typeName =
this.descriptor.getPrimitiveType().getPrimitiveTypeName();
// RLE encoding only supports boolean type `Values`, and `bitwidth`
is always 1.
if (typeName == BOOLEAN) {
- return new VectorizedRleValuesReader(1);
+ yield new VectorizedRleValuesReader(1);
} else {
throw new UnsupportedOperationException(
"RLE encoding is not supported for values of type: " + typeName);
}
- default:
- throw new UnsupportedOperationException("Unsupported encoding: " +
encoding);
- }
+ }
+ default -> throw new UnsupportedOperationException("Unsupported
encoding: " + encoding);
+ };
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 58add7bc04e..584aaa2d118 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -236,15 +236,15 @@ public final class VectorizedRleValuesReader extends
ValuesReader
n = (int) (end - start + 1);
switch (mode) {
- case RLE:
+ case RLE -> {
if (currentValue == state.maxDefinitionLevel) {
updater.readValues(n, state.valueOffset, values, valueReader);
} else {
nulls.putNulls(state.valueOffset, n);
}
state.valueOffset += n;
- break;
- case PACKED:
+ }
+ case PACKED -> {
for (int i = 0; i < n; ++i) {
int currentValue = currentBuffer[currentBufferIdx++];
if (currentValue == state.maxDefinitionLevel) {
@@ -253,7 +253,7 @@ public final class VectorizedRleValuesReader extends
ValuesReader
nulls.putNull(state.valueOffset++);
}
}
- break;
+ }
}
state.levelOffset += n;
leftInBatch -= n;
@@ -404,7 +404,7 @@ public final class VectorizedRleValuesReader extends
ValuesReader
long rangeEnd = state.currentRangeEnd();
switch (mode) {
- case RLE:
+ case RLE -> {
// This RLE block is consist of top-level rows, so we'll need to
check
// if the rows should be skipped according to row indexes.
if (currentValue == 0) {
@@ -462,8 +462,8 @@ public final class VectorizedRleValuesReader extends
ValuesReader
leftInPage -= valuesLeftInBlock;
currentCount -= valuesLeftInBlock;
}
- break;
- case PACKED:
+ }
+ case PACKED -> {
int i = 0;
for (; i < valuesLeftInBlock; i++) {
@@ -498,7 +498,7 @@ public final class VectorizedRleValuesReader extends
ValuesReader
leftInPage -= i;
currentCount -= i;
currentBufferIdx += i;
- break;
+ }
}
}
@@ -626,7 +626,7 @@ public final class VectorizedRleValuesReader extends
ValuesReader
VectorizedValuesReader valueReader,
ParquetVectorUpdater updater) {
switch (mode) {
- case RLE:
+ case RLE -> {
if (currentValue == state.maxDefinitionLevel) {
updater.readValues(n, state.valueOffset, values, valueReader);
} else {
@@ -634,8 +634,8 @@ public final class VectorizedRleValuesReader extends
ValuesReader
}
state.valueOffset += n;
defLevels.putInts(state.levelOffset, n, currentValue);
- break;
- case PACKED:
+ }
+ case PACKED -> {
for (int i = 0; i < n; ++i) {
int currentValue = currentBuffer[currentBufferIdx++];
if (currentValue == state.maxDefinitionLevel) {
@@ -645,7 +645,7 @@ public final class VectorizedRleValuesReader extends
ValuesReader
}
defLevels.putInt(state.levelOffset + i, currentValue);
}
- break;
+ }
}
}
@@ -662,14 +662,14 @@ public final class VectorizedRleValuesReader extends
ValuesReader
if (currentCount == 0 && !readNextGroup()) break;
int num = Math.min(n, this.currentCount);
switch (mode) {
- case RLE:
+ case RLE -> {
// We only need to skip non-null values from `valuesReader` since
nulls are represented
// via definition levels which are skipped here via decrementing
`currentCount`.
if (currentValue == state.maxDefinitionLevel) {
updater.skipValues(num, valuesReader);
}
- break;
- case PACKED:
+ }
+ case PACKED -> {
int totalSkipNum = 0;
for (int i = 0; i < num; ++i) {
// Same as above, only skip non-null values from `valuesReader`
@@ -678,7 +678,7 @@ public final class VectorizedRleValuesReader extends
ValuesReader
}
}
updater.skipValues(totalSkipNum, valuesReader);
- break;
+ }
}
currentCount -= num;
n -= num;
@@ -695,13 +695,11 @@ public final class VectorizedRleValuesReader extends
ValuesReader
if (currentCount == 0 && !readNextGroup()) break;
int n = Math.min(left, this.currentCount);
switch (mode) {
- case RLE:
- c.putInts(rowId, n, currentValue);
- break;
- case PACKED:
+ case RLE -> c.putInts(rowId, n, currentValue);
+ case PACKED -> {
c.putInts(rowId, n, currentBuffer, currentBufferIdx);
currentBufferIdx += n;
- break;
+ }
}
rowId += n;
left -= n;
@@ -772,15 +770,13 @@ public final class VectorizedRleValuesReader extends
ValuesReader
if (this.currentCount == 0) this.readNextGroup();
int n = Math.min(left, this.currentCount);
switch (mode) {
- case RLE:
- c.putBooleans(rowId, n, currentValue != 0);
- break;
- case PACKED:
+ case RLE -> c.putBooleans(rowId, n, currentValue != 0);
+ case PACKED -> {
for (int i = 0; i < n; ++i) {
// For Boolean types, `currentBuffer[currentBufferIdx++]` can only
be 0 or 1
c.putByte(rowId + i, (byte) currentBuffer[currentBufferIdx++]);
}
- break;
+ }
}
rowId += n;
left -= n;
@@ -878,27 +874,23 @@ public final class VectorizedRleValuesReader extends
ValuesReader
* Reads the next byteWidth little endian int.
*/
private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
- switch (bytesWidth) {
- case 0:
- return 0;
- case 1:
- return in.read();
- case 2: {
+ return switch (bytesWidth) {
+ case 0 -> 0;
+ case 1 -> in.read();
+ case 2 -> {
int ch2 = in.read();
int ch1 = in.read();
- return (ch1 << 8) + ch2;
+ yield (ch1 << 8) + ch2;
}
- case 3: {
+ case 3 -> {
int ch3 = in.read();
int ch2 = in.read();
int ch1 = in.read();
- return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
+ yield (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
}
- case 4: {
- return readIntLittleEndian();
- }
- }
- throw new RuntimeException("Unreachable");
+ case 4 -> readIntLittleEndian();
+ default -> throw new RuntimeException("Unreachable");
+ };
}
/**
@@ -914,11 +906,11 @@ public final class VectorizedRleValuesReader extends
ValuesReader
int header = readUnsignedVarInt();
this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
switch (mode) {
- case RLE:
+ case RLE -> {
this.currentCount = header >>> 1;
this.currentValue = readIntLittleEndianPaddedOnBitWidth();
- break;
- case PACKED:
+ }
+ case PACKED -> {
int numGroups = header >>> 1;
this.currentCount = numGroups * 8;
@@ -933,9 +925,7 @@ public final class VectorizedRleValuesReader extends
ValuesReader
this.packer.unpack8Values(buffer, buffer.position(),
this.currentBuffer, valueIndex);
valueIndex += 8;
}
- break;
- default:
- throw new ParquetDecodingException("not a valid mode " + this.mode);
+ }
}
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read from input stream",
e);
@@ -953,11 +943,8 @@ public final class VectorizedRleValuesReader extends
ValuesReader
if (this.currentCount == 0 && !readNextGroup()) break;
int num = Math.min(left, this.currentCount);
switch (mode) {
- case RLE:
- break;
- case PACKED:
- currentBufferIdx += num;
- break;
+ case RLE -> {}
+ case PACKED -> currentBufferIdx += num;
}
currentCount -= num;
left -= num;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]