This is an automated email from the ASF dual-hosted git repository. beliefer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b3c56cffdb6 [SPARK-45515][CORE][SQL][FOLLOWUP] Use enhanced switch expressions to replace the regular switch statement b3c56cffdb6 is described below commit b3c56cffdb6f731a1c8677a6bc896be0144ac0fc Author: Jiaan Geng <belie...@163.com> AuthorDate: Thu Dec 7 13:50:44 2023 +0800 [SPARK-45515][CORE][SQL][FOLLOWUP] Use enhanced switch expressions to replace the regular switch statement ### What changes were proposed in this pull request? This PR follows up https://github.com/apache/spark/pull/43349. This pr also does not include parts of the hive and hive-thriftserver module. ### Why are the changes needed? Please see https://github.com/apache/spark/pull/43349. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #44183 from beliefer/SPARK-45515_followup. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Jiaan Geng <belie...@163.com> --- .../org/apache/spark/network/util/DBProvider.java | 16 +-- .../apache/spark/network/RpcIntegrationSuite.java | 10 +- .../java/org/apache/spark/network/StreamSuite.java | 19 ++- .../shuffle/checksum/ShuffleChecksumHelper.java | 15 +- .../apache/spark/unsafe/UnsafeAlignedOffset.java | 10 +- .../apache/spark/launcher/SparkLauncherSuite.java | 13 +- .../apache/spark/launcher/CommandBuilderUtils.java | 98 +++++++------ .../spark/launcher/SparkClassCommandBuilder.java | 45 +++--- .../spark/launcher/SparkSubmitCommandBuilder.java | 55 +++----- .../spark/launcher/InProcessLauncherSuite.java | 43 +++--- .../sql/connector/util/V2ExpressionSQLBuilder.java | 154 +++++---------------- .../parquet/ParquetVectorUpdaterFactory.java | 39 +++--- .../parquet/VectorizedColumnReader.java | 50 +++---- .../parquet/VectorizedRleValuesReader.java | 89 +++++------- 14 files changed, 251 insertions(+), 405 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java index 1adb9cfe5d3..5a25bdda233 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java @@ -38,17 +38,17 @@ public class DBProvider { StoreVersion version, ObjectMapper mapper) throws IOException { if (dbFile != null) { - switch (dbBackend) { - case LEVELDB: + return switch (dbBackend) { + case LEVELDB -> { org.iq80.leveldb.DB levelDB = LevelDBProvider.initLevelDB(dbFile, version, mapper); logger.warn("The LEVELDB is deprecated. Please use ROCKSDB instead."); - return levelDB != null ? new LevelDB(levelDB) : null; - case ROCKSDB: + yield levelDB != null ? new LevelDB(levelDB) : null; + } + case ROCKSDB -> { org.rocksdb.RocksDB rocksDB = RocksDBProvider.initRockDB(dbFile, version, mapper); - return rocksDB != null ? new RocksDB(rocksDB) : null; - default: - throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend); - } + yield rocksDB != null ? new RocksDB(rocksDB) : null; + } + }; } return null; } diff --git a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java index 55a0cc73f8b..40495d6912c 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java @@ -67,14 +67,10 @@ public class RpcIntegrationSuite { String msg = JavaUtils.bytesToString(message); String[] parts = msg.split("/"); switch (parts[0]) { - case "hello": + case "hello" -> callback.onSuccess(JavaUtils.stringToBytes("Hello, " + parts[1] + "!")); - break; - case "return error": - callback.onFailure(new RuntimeException("Returned: " + parts[1])); - break; - case "throw error": - throw new RuntimeException("Thrown: " + parts[1]); + case "return error" -> callback.onFailure(new RuntimeException("Returned: " + parts[1])); + case "throw error" -> throw new RuntimeException("Thrown: " + parts[1]); } } diff --git a/common/network-common/src/test/java/org/apache/spark/network/StreamSuite.java b/common/network-common/src/test/java/org/apache/spark/network/StreamSuite.java index 096dfc8b1cf..4f4637e302b 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/StreamSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/StreamSuite.java @@ -185,27 +185,26 @@ public class StreamSuite { ByteArrayOutputStream baos = null; switch (streamId) { - case "largeBuffer": + case "largeBuffer" -> { baos = new ByteArrayOutputStream(); out = baos; srcBuffer = testData.largeBuffer; - break; - case "smallBuffer": + } + case "smallBuffer" -> { baos = new ByteArrayOutputStream(); out = baos; srcBuffer = testData.smallBuffer; - break; - case "file": + } + case "file" -> { outFile = File.createTempFile("data", ".tmp", testData.tempDir); out = new FileOutputStream(outFile); - break; - case "emptyBuffer": + } + case "emptyBuffer" -> { baos = new ByteArrayOutputStream(); out = baos; srcBuffer = testData.emptyBuffer; - break; - default: - throw new IllegalArgumentException(streamId); + } + default -> throw new IllegalArgumentException(streamId); } TestCallback callback = new TestCallback(out); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java index 6993be4c430..1feac49752c 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java @@ -50,24 +50,25 @@ public class ShuffleChecksumHelper { private static Checksum[] getChecksumsByAlgorithm(int num, String algorithm) { Checksum[] checksums; switch (algorithm) { - case "ADLER32": + case "ADLER32" -> { checksums = new Adler32[num]; for (int i = 0; i < num; i++) { checksums[i] = new Adler32(); } - return checksums; + } - case "CRC32": + case "CRC32" -> { checksums = new CRC32[num]; for (int i = 0; i < num; i++) { checksums[i] = new CRC32(); } - return checksums; + } - default: - throw new UnsupportedOperationException( - "Unsupported shuffle checksum algorithm: " + algorithm); + default -> throw new UnsupportedOperationException( + "Unsupported shuffle checksum algorithm: " + algorithm); } + + return checksums; } public static Checksum getChecksumByAlgorithm(String algorithm) { diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java index 2dfd1ecb635..fcb3ab9e6c1 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java @@ -53,13 +53,9 @@ public class UnsafeAlignedOffset { public static void putSize(Object object, long offset, int value) { switch (getUaoSize()) { - case 4: - Platform.putInt(object, offset, value); - break; - case 8: - Platform.putLong(object, offset, value); - break; - default: + case 4 -> Platform.putInt(object, offset, value); + case 8 -> Platform.putLong(object, offset, value); + default -> // checkstyle.off: RegexpSinglelineJava throw new AssertionError("Illegal UAO_SIZE"); // checkstyle.on: RegexpSinglelineJava diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index cb132f1050d..f0da1eb10f8 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -187,16 +187,9 @@ public class SparkLauncherSuite extends BaseSuite { InProcessLauncher launcher = new InProcessLauncher() .setAppResource(SparkLauncher.NO_RESOURCE); switch (args.length) { - case 2: - launcher.addSparkArg(args[0], args[1]); - break; - - case 1: - launcher.addSparkArg(args[0]); - break; - - default: - fail("FIXME: invalid test."); + case 2 -> launcher.addSparkArg(args[0], args[1]); + case 1 -> launcher.addSparkArg(args[0]); + default -> fail("FIXME: invalid test."); } SparkAppHandle handle = launcher.startApplication(); diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java index 172fb8c5608..eb57d43c853 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java +++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java @@ -143,59 +143,61 @@ class CommandBuilderUtils { escapeNext = false; } else if (inOpt) { switch (c) { - case '\\': - if (inSingleQuote) { - opt.appendCodePoint(c); - } else { - escapeNext = true; + case '\\' -> { + if (inSingleQuote) { + opt.appendCodePoint(c); + } else { + escapeNext = true; + } } - break; - case '\'': - if (inDoubleQuote) { - opt.appendCodePoint(c); - } else { - inSingleQuote = !inSingleQuote; + case '\'' -> { + if (inDoubleQuote) { + opt.appendCodePoint(c); + } else { + inSingleQuote = !inSingleQuote; + } } - break; - case '"': - if (inSingleQuote) { - opt.appendCodePoint(c); - } else { - inDoubleQuote = !inDoubleQuote; + case '"' -> { + if (inSingleQuote) { + opt.appendCodePoint(c); + } else { + inDoubleQuote = !inDoubleQuote; + } } - break; - default: - if (!Character.isWhitespace(c) || inSingleQuote || inDoubleQuote) { - opt.appendCodePoint(c); - } else { - opts.add(opt.toString()); - opt.setLength(0); - inOpt = false; - hasData = false; + default -> { + if (!Character.isWhitespace(c) || inSingleQuote || inDoubleQuote) { + opt.appendCodePoint(c); + } else { + opts.add(opt.toString()); + opt.setLength(0); + inOpt = false; + hasData = false; + } } } } else { switch (c) { - case '\'': - inSingleQuote = true; - inOpt = true; - hasData = true; - break; - case '"': - inDoubleQuote = true; - inOpt = true; - hasData = true; - break; - case '\\': - escapeNext = true; - inOpt = true; - hasData = true; - break; - default: - if (!Character.isWhitespace(c)) { + case '\'' -> { + inSingleQuote = true; + inOpt = true; + hasData = true; + } + case '"' -> { + inDoubleQuote = true; + inOpt = true; + hasData = true; + } + case '\\' -> { + escapeNext = true; inOpt = true; hasData = true; - opt.appendCodePoint(c); + } + default -> { + if (!Character.isWhitespace(c)) { + inOpt = true; + hasData = true; + opt.appendCodePoint(c); + } } } } @@ -256,12 +258,8 @@ class CommandBuilderUtils { for (int i = 0; i < arg.length(); i++) { int cp = arg.codePointAt(i); switch (cp) { - case '"': - quoted.append('"'); - break; - - default: - break; + case '"' -> quoted.append('"'); + default -> {} } quoted.appendCodePoint(cp); } diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java index 4ea9cd5a44d..8d95bc06d7a 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java @@ -44,49 +44,46 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder { public List<String> buildCommand(Map<String, String> env) throws IOException, IllegalArgumentException { List<String> javaOptsKeys = new ArrayList<>(); - String memKey = null; String extraClassPath = null; // Master, Worker, HistoryServer, ExternalShuffleService use // SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. - switch (className) { - case "org.apache.spark.deploy.master.Master": + String memKey = switch (className) { + case "org.apache.spark.deploy.master.Master" -> { javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); javaOptsKeys.add("SPARK_MASTER_OPTS"); extraClassPath = getenv("SPARK_DAEMON_CLASSPATH"); - memKey = "SPARK_DAEMON_MEMORY"; - break; - case "org.apache.spark.deploy.worker.Worker": + yield "SPARK_DAEMON_MEMORY"; + } + case "org.apache.spark.deploy.worker.Worker" -> { javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); javaOptsKeys.add("SPARK_WORKER_OPTS"); extraClassPath = getenv("SPARK_DAEMON_CLASSPATH"); - memKey = "SPARK_DAEMON_MEMORY"; - break; - case "org.apache.spark.deploy.history.HistoryServer": + yield "SPARK_DAEMON_MEMORY"; + } + case "org.apache.spark.deploy.history.HistoryServer" -> { javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); javaOptsKeys.add("SPARK_HISTORY_OPTS"); extraClassPath = getenv("SPARK_DAEMON_CLASSPATH"); - memKey = "SPARK_DAEMON_MEMORY"; - break; - case "org.apache.spark.executor.CoarseGrainedExecutorBackend": + yield "SPARK_DAEMON_MEMORY"; + } + case "org.apache.spark.executor.CoarseGrainedExecutorBackend" -> { javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); - memKey = "SPARK_EXECUTOR_MEMORY"; extraClassPath = getenv("SPARK_EXECUTOR_CLASSPATH"); - break; - case "org.apache.spark.deploy.ExternalShuffleService": + yield "SPARK_EXECUTOR_MEMORY"; + } + case "org.apache.spark.deploy.ExternalShuffleService" -> { javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); javaOptsKeys.add("SPARK_SHUFFLE_OPTS"); extraClassPath = getenv("SPARK_DAEMON_CLASSPATH"); - memKey = "SPARK_DAEMON_MEMORY"; - break; - case "org.apache.hive.beeline.BeeLine": + yield "SPARK_DAEMON_MEMORY"; + } + case "org.apache.hive.beeline.BeeLine" -> { javaOptsKeys.add("SPARK_BEELINE_OPTS"); - memKey = "SPARK_BEELINE_MEMORY"; - break; - default: - memKey = "SPARK_DRIVER_MEMORY"; - break; - } + yield "SPARK_BEELINE_MEMORY"; + } + default -> "SPARK_DRIVER_MEMORY"; + }; List<String> cmd = buildJavaCommand(extraClassPath); diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index 6c103d3cdf9..eb2fb1515cb 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -490,37 +490,21 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { @Override protected boolean handle(String opt, String value) { switch (opt) { - case MASTER: - master = value; - break; - case REMOTE: - remote = value; - break; - case DEPLOY_MODE: - deployMode = value; - break; - case PROPERTIES_FILE: - propertiesFile = value; - break; - case DRIVER_MEMORY: - conf.put(SparkLauncher.DRIVER_MEMORY, value); - break; - case DRIVER_JAVA_OPTIONS: - conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); - break; - case DRIVER_LIBRARY_PATH: - conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); - break; - case DRIVER_CLASS_PATH: - conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value); - break; - case CONF: + case MASTER -> master = value; + case REMOTE -> remote = value; + case DEPLOY_MODE -> deployMode = value; + case PROPERTIES_FILE -> propertiesFile = value; + case DRIVER_MEMORY -> conf.put(SparkLauncher.DRIVER_MEMORY, value); + case DRIVER_JAVA_OPTIONS -> conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); + case DRIVER_LIBRARY_PATH -> conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); + case DRIVER_CLASS_PATH -> conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value); + case CONF -> { checkArgument(value != null, "Missing argument to %s", CONF); String[] setConf = value.split("=", 2); checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value); conf.put(setConf[0], setConf[1]); - break; - case CLASS: + } + case CLASS -> { // The special classes require some special command line handling, since they allow // mixing spark-submit arguments with arguments that should be propagated to the shell // itself. Note that for this to work, the "--class" argument must come before any @@ -530,25 +514,22 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { allowsMixedArguments = true; appResource = specialClasses.get(value); } - break; - case KILL_SUBMISSION: - case STATUS: + } + case KILL_SUBMISSION, STATUS -> { isSpecialCommand = true; parsedArgs.add(opt); parsedArgs.add(value); - break; - case HELP: - case USAGE_ERROR: - case VERSION: + } + case HELP, USAGE_ERROR, VERSION -> { isSpecialCommand = true; parsedArgs.add(opt); - break; - default: + } + default -> { parsedArgs.add(opt); if (value != null) { parsedArgs.add(value); } - break; + } } return true; } diff --git a/launcher/src/test/java/org/apache/spark/launcher/InProcessLauncherSuite.java b/launcher/src/test/java/org/apache/spark/launcher/InProcessLauncherSuite.java index d901255fb13..3a29e7c6df8 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/InProcessLauncherSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/InProcessLauncherSuite.java @@ -94,16 +94,11 @@ public class InProcessLauncherSuite extends BaseSuite { if (opt == CONF) { String[] conf = value.split("="); switch(conf[0]) { - case LauncherProtocol.CONF_LAUNCHER_PORT: - port.set(conf[1]); - break; - - case LauncherProtocol.CONF_LAUNCHER_SECRET: - secret.set(conf[1]); - break; - - default: + case LauncherProtocol.CONF_LAUNCHER_PORT -> port.set(conf[1]); + case LauncherProtocol.CONF_LAUNCHER_SECRET -> secret.set(conf[1]); + default -> { // no op + } } } @@ -128,25 +123,19 @@ public class InProcessLauncherSuite extends BaseSuite { String test = args[args.length - 1]; switch (test) { - case TEST_SUCCESS: - break; - - case TEST_FAILURE: - throw new IllegalStateException(TEST_FAILURE_MESSAGE); - - case TEST_KILL: - try { - // Wait for a reasonable amount of time to avoid the test hanging forever on failure, - // but still allowing for time outs to hopefully not occur on busy machines. - Thread.sleep(10000); - fail("Did not get expected interrupt after 10s."); - } catch (InterruptedException ie) { - // Expected. + case TEST_SUCCESS -> {} + case TEST_FAILURE -> throw new IllegalStateException(TEST_FAILURE_MESSAGE); + case TEST_KILL -> { + try { + // Wait for a reasonable amount of time to avoid the test hanging forever on failure, + // but still allowing for time outs to hopefully not occur on busy machines. + Thread.sleep(10000); + fail("Did not get expected interrupt after 10s."); + } catch (InterruptedException ie) { + // Expected. + } } - break; - - default: - fail("Unknown test " + test); + default -> fail("Unknown test " + test); } } catch (Throwable t) { lastError = t; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java index 30ca02a36f1..5cd28f1c259 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java @@ -60,17 +60,10 @@ public class V2ExpressionSQLBuilder { for (char c : str.toCharArray()) { switch (c) { - case '_': - builder.append("\\_"); - break; - case '%': - builder.append("\\%"); - break; - case '\'': - builder.append("\\\'"); - break; - default: - builder.append(c); + case '_' -> builder.append("\\_"); + case '%' -> builder.append("\\%"); + case '\'' -> builder.append("\\\'"); + default -> builder.append(c); } } @@ -91,121 +84,48 @@ public class V2ExpressionSQLBuilder { build(sortOrder.expression()), sortOrder.direction(), sortOrder.nullOrdering()); } else if (expr instanceof GeneralScalarExpression e) { String name = e.name(); - switch (name) { - case "IN": { + return switch (name) { + case "IN" -> { Expression[] expressions = e.children(); List<String> children = expressionsToStringList(expressions, 1, expressions.length - 1); - return visitIn(build(expressions[0]), children); + yield visitIn(build(expressions[0]), children); } - case "IS_NULL": - return visitIsNull(build(e.children()[0])); - case "IS_NOT_NULL": - return visitIsNotNull(build(e.children()[0])); - case "STARTS_WITH": - return visitStartsWith(build(e.children()[0]), build(e.children()[1])); - case "ENDS_WITH": - return visitEndsWith(build(e.children()[0]), build(e.children()[1])); - case "CONTAINS": - return visitContains(build(e.children()[0]), build(e.children()[1])); - case "=": - case "<>": - case "<=>": - case "<": - case "<=": - case ">": - case ">=": - return visitBinaryComparison( - name, inputToSQL(e.children()[0]), inputToSQL(e.children()[1])); - case "+": - case "*": - case "/": - case "%": - case "&": - case "|": - case "^": - return visitBinaryArithmetic( - name, inputToSQL(e.children()[0]), inputToSQL(e.children()[1])); - case "-": + case "IS_NULL" -> visitIsNull(build(e.children()[0])); + case "IS_NOT_NULL" -> visitIsNotNull(build(e.children()[0])); + case "STARTS_WITH" -> visitStartsWith(build(e.children()[0]), build(e.children()[1])); + case "ENDS_WITH" -> visitEndsWith(build(e.children()[0]), build(e.children()[1])); + case "CONTAINS" -> visitContains(build(e.children()[0]), build(e.children()[1])); + case "=", "<>", "<=>", "<", "<=", ">", ">=" -> + visitBinaryComparison(name, inputToSQL(e.children()[0]), inputToSQL(e.children()[1])); + case "+", "*", "/", "%", "&", "|", "^" -> + visitBinaryArithmetic(name, inputToSQL(e.children()[0]), inputToSQL(e.children()[1])); + case "-" -> { if (e.children().length == 1) { - return visitUnaryArithmetic(name, inputToSQL(e.children()[0])); + yield visitUnaryArithmetic(name, inputToSQL(e.children()[0])); } else { - return visitBinaryArithmetic( + yield visitBinaryArithmetic( name, inputToSQL(e.children()[0]), inputToSQL(e.children()[1])); } - case "AND": - return visitAnd(name, build(e.children()[0]), build(e.children()[1])); - case "OR": - return visitOr(name, build(e.children()[0]), build(e.children()[1])); - case "NOT": - return visitNot(build(e.children()[0])); - case "~": - return visitUnaryArithmetic(name, inputToSQL(e.children()[0])); - case "ABS": - case "COALESCE": - case "GREATEST": - case "LEAST": - case "RAND": - case "LOG": - case "LOG10": - case "LOG2": - case "LN": - case "EXP": - case "POWER": - case "SQRT": - case "FLOOR": - case "CEIL": - case "ROUND": - case "SIN": - case "SINH": - case "COS": - case "COSH": - case "TAN": - case "TANH": - case "COT": - case "ASIN": - case "ASINH": - case "ACOS": - case "ACOSH": - case "ATAN": - case "ATANH": - case "ATAN2": - case "CBRT": - case "DEGREES": - case "RADIANS": - case "SIGN": - case "WIDTH_BUCKET": - case "SUBSTRING": - case "UPPER": - case "LOWER": - case "TRANSLATE": - case "DATE_ADD": - case "DATE_DIFF": - case "TRUNC": - case "AES_ENCRYPT": - case "AES_DECRYPT": - case "SHA1": - case "SHA2": - case "MD5": - case "CRC32": - case "BIT_LENGTH": - case "CHAR_LENGTH": - case "CONCAT": - return visitSQLFunction(name, expressionsToStringArray(e.children())); - case "CASE_WHEN": { - return visitCaseWhen(expressionsToStringArray(e.children())); } - case "TRIM": - return visitTrim("BOTH", expressionsToStringArray(e.children())); - case "LTRIM": - return visitTrim("LEADING", expressionsToStringArray(e.children())); - case "RTRIM": - return visitTrim("TRAILING", expressionsToStringArray(e.children())); - case "OVERLAY": - return visitOverlay(expressionsToStringArray(e.children())); + case "AND" -> visitAnd(name, build(e.children()[0]), build(e.children()[1])); + case "OR" -> visitOr(name, build(e.children()[0]), build(e.children()[1])); + case "NOT" -> visitNot(build(e.children()[0])); + case "~" -> visitUnaryArithmetic(name, inputToSQL(e.children()[0])); + case "ABS", "COALESCE", "GREATEST", "LEAST", "RAND", "LOG", "LOG10", "LOG2", "LN", "EXP", + "POWER", "SQRT", "FLOOR", "CEIL", "ROUND", "SIN", "SINH", "COS", "COSH", "TAN", "TANH", + "COT", "ASIN", "ASINH", "ACOS", "ACOSH", "ATAN", "ATANH", "ATAN2", "CBRT", "DEGREES", + "RADIANS", "SIGN", "WIDTH_BUCKET", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE", + "DATE_ADD", "DATE_DIFF", "TRUNC", "AES_ENCRYPT", "AES_DECRYPT", "SHA1", "SHA2", "MD5", + "CRC32", "BIT_LENGTH", "CHAR_LENGTH", "CONCAT" -> + visitSQLFunction(name, expressionsToStringArray(e.children())); + case "CASE_WHEN" -> visitCaseWhen(expressionsToStringArray(e.children())); + case "TRIM" -> visitTrim("BOTH", expressionsToStringArray(e.children())); + case "LTRIM" -> visitTrim("LEADING", expressionsToStringArray(e.children())); + case "RTRIM" -> visitTrim("TRAILING", expressionsToStringArray(e.children())); + case "OVERLAY" -> visitOverlay(expressionsToStringArray(e.children())); // TODO supports other expressions - default: - return visitUnexpectedExpr(expr); - } + default -> visitUnexpectedExpr(expr); + }; } else if (expr instanceof Min min) { return visitAggregateFunction("MIN", false, expressionsToStringArray(min.children())); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 54b96bfb873..1ed6c4329eb 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -68,12 +68,12 @@ public class ParquetVectorUpdaterFactory { PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName(); switch (typeName) { - case BOOLEAN: + case BOOLEAN -> { if (sparkType == DataTypes.BooleanType) { return new BooleanUpdater(); } - break; - case INT32: + } + case INT32 -> { if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, sparkType)) { return new IntegerUpdater(); } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) { @@ -95,8 +95,8 @@ public class ParquetVectorUpdaterFactory { } else if (sparkType instanceof YearMonthIntervalType) { return new IntegerUpdater(); } - break; - case INT64: + } + case INT64 -> { // This is where we implement support for the valid type conversions. if (sparkType == DataTypes.LongType || canReadAsLongDecimal(descriptor, sparkType)) { if (DecimalType.is32BitDecimalType(sparkType)) { @@ -110,7 +110,7 @@ public class ParquetVectorUpdaterFactory { // fallbacks. We read them as decimal values. return new UnsignedLongUpdater(); } else if (isTimestamp(sparkType) && - isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { validateTimestampType(sparkType); if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongUpdater(); @@ -119,7 +119,7 @@ public class ParquetVectorUpdaterFactory { return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz); } } else if (isTimestamp(sparkType) && - isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { validateTimestampType(sparkType); if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongAsMicrosUpdater(); @@ -130,18 +130,18 @@ public class ParquetVectorUpdaterFactory { } else if (sparkType instanceof DayTimeIntervalType) { return new LongUpdater(); } - break; - case FLOAT: + } + case FLOAT -> { if (sparkType == DataTypes.FloatType) { return new FloatUpdater(); } - break; - case DOUBLE: + } + case DOUBLE -> { if (sparkType == DataTypes.DoubleType) { return new DoubleUpdater(); } - break; - case INT96: + } + case INT96 -> { if (sparkType == DataTypes.TimestampNTZType) { convertErrorForTimestampNTZ(typeName.name()); } else if (sparkType == DataTypes.TimestampType) { @@ -163,14 +163,14 @@ public class ParquetVectorUpdaterFactory { } } } - break; - case BINARY: + } + case BINARY -> { if (sparkType == DataTypes.StringType || sparkType == DataTypes.BinaryType || canReadAsBinaryDecimal(descriptor, sparkType)) { return new BinaryUpdater(); } - break; - case FIXED_LEN_BYTE_ARRAY: + } + case FIXED_LEN_BYTE_ARRAY -> { int arrayLen = descriptor.getPrimitiveType().getTypeLength(); if (canReadAsIntDecimal(descriptor, sparkType)) { return new FixedLenByteArrayAsIntUpdater(arrayLen); @@ -181,9 +181,8 @@ public class ParquetVectorUpdaterFactory { } else if (sparkType == DataTypes.BinaryType) { return new FixedLenByteArrayUpdater(arrayLen); } - break; - default: - break; + } + default -> {} } // If we get here, it means the combination of Spark and Parquet type is invalid or not diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index f185b251ed9..04fbe716ad9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -141,26 +141,20 @@ public class VectorizedColumnReader { } private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) { - boolean isSupported = false; - switch (typeName) { - case INT32: - isSupported = !(logicalTypeAnnotation instanceof DateLogicalTypeAnnotation) || + return switch (typeName) { + case INT32 -> + !(logicalTypeAnnotation instanceof DateLogicalTypeAnnotation) || "CORRECTED".equals(datetimeRebaseMode); - break; - case INT64: + case INT64 -> { if (updaterFactory.isTimestampTypeMatched(TimeUnit.MICROS)) { - isSupported = "CORRECTED".equals(datetimeRebaseMode); + yield "CORRECTED".equals(datetimeRebaseMode); } else { - isSupported = !updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS); + yield !updaterFactory.isTimestampTypeMatched(TimeUnit.MILLIS); } - break; - case FLOAT: - case DOUBLE: - case BINARY: - isSupported = true; - break; - } - return isSupported; + } + case FLOAT, DOUBLE, BINARY -> true; + default -> false; + }; } /** @@ -325,28 +319,24 @@ public class VectorizedColumnReader { } private ValuesReader getValuesReader(Encoding encoding) { - switch (encoding) { - case PLAIN: - return new VectorizedPlainValuesReader(); - case DELTA_BYTE_ARRAY: - return new VectorizedDeltaByteArrayReader(); - case DELTA_LENGTH_BYTE_ARRAY: - return new VectorizedDeltaLengthByteArrayReader(); - case DELTA_BINARY_PACKED: - return new VectorizedDeltaBinaryPackedReader(); - case RLE: + return switch (encoding) { + case PLAIN -> new VectorizedPlainValuesReader(); + case DELTA_BYTE_ARRAY -> new VectorizedDeltaByteArrayReader(); + case DELTA_LENGTH_BYTE_ARRAY -> new VectorizedDeltaLengthByteArrayReader(); + case DELTA_BINARY_PACKED -> new VectorizedDeltaBinaryPackedReader(); + case RLE -> { PrimitiveType.PrimitiveTypeName typeName = this.descriptor.getPrimitiveType().getPrimitiveTypeName(); // RLE encoding only supports boolean type `Values`, and `bitwidth` is always 1. if (typeName == BOOLEAN) { - return new VectorizedRleValuesReader(1); + yield new VectorizedRleValuesReader(1); } else { throw new UnsupportedOperationException( "RLE encoding is not supported for values of type: " + typeName); } - default: - throw new UnsupportedOperationException("Unsupported encoding: " + encoding); - } + } + default -> throw new UnsupportedOperationException("Unsupported encoding: " + encoding); + }; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 58add7bc04e..584aaa2d118 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -236,15 +236,15 @@ public final class VectorizedRleValuesReader extends ValuesReader n = (int) (end - start + 1); switch (mode) { - case RLE: + case RLE -> { if (currentValue == state.maxDefinitionLevel) { updater.readValues(n, state.valueOffset, values, valueReader); } else { nulls.putNulls(state.valueOffset, n); } state.valueOffset += n; - break; - case PACKED: + } + case PACKED -> { for (int i = 0; i < n; ++i) { int currentValue = currentBuffer[currentBufferIdx++]; if (currentValue == state.maxDefinitionLevel) { @@ -253,7 +253,7 @@ public final class VectorizedRleValuesReader extends ValuesReader nulls.putNull(state.valueOffset++); } } - break; + } } state.levelOffset += n; leftInBatch -= n; @@ -404,7 +404,7 @@ public final class VectorizedRleValuesReader extends ValuesReader long rangeEnd = state.currentRangeEnd(); switch (mode) { - case RLE: + case RLE -> { // This RLE block is consist of top-level rows, so we'll need to check // if the rows should be skipped according to row indexes. if (currentValue == 0) { @@ -462,8 +462,8 @@ public final class VectorizedRleValuesReader extends ValuesReader leftInPage -= valuesLeftInBlock; currentCount -= valuesLeftInBlock; } - break; - case PACKED: + } + case PACKED -> { int i = 0; for (; i < valuesLeftInBlock; i++) { @@ -498,7 +498,7 @@ public final class VectorizedRleValuesReader extends ValuesReader leftInPage -= i; currentCount -= i; currentBufferIdx += i; - break; + } } } @@ -626,7 +626,7 @@ public final class VectorizedRleValuesReader extends ValuesReader VectorizedValuesReader valueReader, ParquetVectorUpdater updater) { switch (mode) { - case RLE: + case RLE -> { if (currentValue == state.maxDefinitionLevel) { updater.readValues(n, state.valueOffset, values, valueReader); } else { @@ -634,8 +634,8 @@ public final class VectorizedRleValuesReader extends ValuesReader } state.valueOffset += n; defLevels.putInts(state.levelOffset, n, currentValue); - break; - case PACKED: + } + case PACKED -> { for (int i = 0; i < n; ++i) { int currentValue = currentBuffer[currentBufferIdx++]; if (currentValue == state.maxDefinitionLevel) { @@ -645,7 +645,7 @@ public final class VectorizedRleValuesReader extends ValuesReader } defLevels.putInt(state.levelOffset + i, currentValue); } - break; + } } } @@ -662,14 +662,14 @@ public final class VectorizedRleValuesReader extends ValuesReader if (currentCount == 0 && !readNextGroup()) break; int num = Math.min(n, this.currentCount); switch (mode) { - case RLE: + case RLE -> { // We only need to skip non-null values from `valuesReader` since nulls are represented // via definition levels which are skipped here via decrementing `currentCount`. if (currentValue == state.maxDefinitionLevel) { updater.skipValues(num, valuesReader); } - break; - case PACKED: + } + case PACKED -> { int totalSkipNum = 0; for (int i = 0; i < num; ++i) { // Same as above, only skip non-null values from `valuesReader` @@ -678,7 +678,7 @@ public final class VectorizedRleValuesReader extends ValuesReader } } updater.skipValues(totalSkipNum, valuesReader); - break; + } } currentCount -= num; n -= num; @@ -695,13 +695,11 @@ public final class VectorizedRleValuesReader extends ValuesReader if (currentCount == 0 && !readNextGroup()) break; int n = Math.min(left, this.currentCount); switch (mode) { - case RLE: - c.putInts(rowId, n, currentValue); - break; - case PACKED: + case RLE -> c.putInts(rowId, n, currentValue); + case PACKED -> { c.putInts(rowId, n, currentBuffer, currentBufferIdx); currentBufferIdx += n; - break; + } } rowId += n; left -= n; @@ -772,15 +770,13 @@ public final class VectorizedRleValuesReader extends ValuesReader if (this.currentCount == 0) this.readNextGroup(); int n = Math.min(left, this.currentCount); switch (mode) { - case RLE: - c.putBooleans(rowId, n, currentValue != 0); - break; - case PACKED: + case RLE -> c.putBooleans(rowId, n, currentValue != 0); + case PACKED -> { for (int i = 0; i < n; ++i) { // For Boolean types, `currentBuffer[currentBufferIdx++]` can only be 0 or 1 c.putByte(rowId + i, (byte) currentBuffer[currentBufferIdx++]); } - break; + } } rowId += n; left -= n; @@ -878,27 +874,23 @@ public final class VectorizedRleValuesReader extends ValuesReader * Reads the next byteWidth little endian int. */ private int readIntLittleEndianPaddedOnBitWidth() throws IOException { - switch (bytesWidth) { - case 0: - return 0; - case 1: - return in.read(); - case 2: { + return switch (bytesWidth) { + case 0 -> 0; + case 1 -> in.read(); + case 2 -> { int ch2 = in.read(); int ch1 = in.read(); - return (ch1 << 8) + ch2; + yield (ch1 << 8) + ch2; } - case 3: { + case 3 -> { int ch3 = in.read(); int ch2 = in.read(); int ch1 = in.read(); - return (ch1 << 16) + (ch2 << 8) + (ch3 << 0); + yield (ch1 << 16) + (ch2 << 8) + (ch3 << 0); } - case 4: { - return readIntLittleEndian(); - } - } - throw new RuntimeException("Unreachable"); + case 4 -> readIntLittleEndian(); + default -> throw new RuntimeException("Unreachable"); + }; } /** @@ -914,11 +906,11 @@ public final class VectorizedRleValuesReader extends ValuesReader int header = readUnsignedVarInt(); this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; switch (mode) { - case RLE: + case RLE -> { this.currentCount = header >>> 1; this.currentValue = readIntLittleEndianPaddedOnBitWidth(); - break; - case PACKED: + } + case PACKED -> { int numGroups = header >>> 1; this.currentCount = numGroups * 8; @@ -933,9 +925,7 @@ public final class VectorizedRleValuesReader extends ValuesReader this.packer.unpack8Values(buffer, buffer.position(), this.currentBuffer, valueIndex); valueIndex += 8; } - break; - default: - throw new ParquetDecodingException("not a valid mode " + this.mode); + } } } catch (IOException e) { throw new ParquetDecodingException("Failed to read from input stream", e); @@ -953,11 +943,8 @@ public final class VectorizedRleValuesReader extends ValuesReader if (this.currentCount == 0 && !readNextGroup()) break; int num = Math.min(left, this.currentCount); switch (mode) { - case RLE: - break; - case PACKED: - currentBufferIdx += num; - break; + case RLE -> {} + case PACKED -> currentBufferIdx += num; } currentCount -= num; left -= num; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org