This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit a407d79680d01c35760f3fe4e76cd4192e34edd1 Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Tue Mar 30 18:04:22 2021 +0200 [BEAM-11712] Add options for input/output paths, make it run via SparkRunner --- sdks/java/testing/tpcds/README.md | 68 ++++++++++++++++++++++ sdks/java/testing/tpcds/build.gradle | 11 +++- .../apache/beam/sdk/tpcds/BeamSqlEnvRunner.java | 14 ++--- .../java/org/apache/beam/sdk/tpcds/BeamTpcds.java | 18 +----- .../org/apache/beam/sdk/tpcds/QueryReader.java | 49 +--------------- .../apache/beam/sdk/tpcds/SqlTransformRunner.java | 9 +-- .../beam/sdk/tpcds/TableSchemaJSONLoader.java | 43 +++++--------- .../org/apache/beam/sdk/tpcds/TpcdsOptions.java | 14 +++++ .../beam/sdk/tpcds/TableSchemaJSONLoaderTest.java | 4 +- 9 files changed, 123 insertions(+), 107 deletions(-) diff --git a/sdks/java/testing/tpcds/README.md b/sdks/java/testing/tpcds/README.md new file mode 100644 index 0000000..89f8073 --- /dev/null +++ b/sdks/java/testing/tpcds/README.md @@ -0,0 +1,68 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# TPC-DS Benchmark + +## Google Dataflow Runner + +To execute TPC-DS benchmark for 1Gb dataset on Google Dataflow, run the following example command from the command line: + +```bash +./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ + --runner=DataflowRunner \ + --queries=3,26,55 \ + --tpcParallel=2 \ + --dataDirectory=/path/to/tpcds_data/ \ + --project=apache-beam-testing \ + --stagingLocation=gs://beamsql_tpcds_1/staging \ + --tempLocation=gs://beamsql_tpcds_2/temp \ + --dataDirectory=/path/to/tpcds_data/ \ + --region=us-west1 \ + --maxNumWorkers=10" +``` + +To run a query using ZetaSQL planner (currently Query96 can be run using ZetaSQL), set the plannerName as below. If not specified, the default planner is Calcite. + +```bash +./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ + --runner=DataflowRunner \ + --queries=96 \ + --tpcParallel=2 \ + --dataDirectory=/path/to/tpcds_data/ \ + --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \ + --project=apache-beam-testing \ + --stagingLocation=gs://beamsql_tpcds_1/staging \ + --tempLocation=gs://beamsql_tpcds_2/temp \ + --region=us-west1 \ + --maxNumWorkers=10" +``` + +## Spark Runner + +To execute TPC-DS benchmark with Query3 for 1Gb dataset on Apache Spark 2.x, run the following example command from the command line: + +```bash +./gradlew :sdks:java:testing:tpcds:run -Ptpcds.runner=":runners:spark:2" -Ptpcds.args=" \ + --runner=SparkRunner \ + --queries=3 \ + --tpcParallel=1 \ + --dataDirectory=/path/to/tpcds_data/ \ + --dataSize=1G \ + --resultsDirectory=/path/to/tpcds_results/" +``` diff --git a/sdks/java/testing/tpcds/build.gradle b/sdks/java/testing/tpcds/build.gradle index 6237776..79fb1e8 100644 --- a/sdks/java/testing/tpcds/build.gradle +++ b/sdks/java/testing/tpcds/build.gradle @@ -33,7 +33,7 @@ def tpcdsArgsProperty = "tpcds.args" def tpcdsRunnerProperty = "tpcds.runner" def tpcdsRunnerDependency = project.findProperty(tpcdsRunnerProperty) ?: ":runners:direct-java" -def shouldProvideSpark = ":runners:spark".equals(tpcdsRunnerDependency) +def shouldProvideSpark = ":runners:spark:2".equals(tpcdsRunnerDependency) def isDataflowRunner = ":runners:google-cloud-dataflow-java".equals(tpcdsRunnerDependency) def runnerConfiguration = ":runners:direct-java".equals(tpcdsRunnerDependency) ? "shadow" : null @@ -88,6 +88,15 @@ if (shouldProvideSpark) { } } +// Execute the TPC-DS queries or suites via Gradle. +// +// Parameters: +// -Ptpcds.runner +// Specify a runner subproject, such as ":runners:spark:2" or ":runners:flink:1.10" +// Defaults to ":runners:direct-java" +// +// -Ptpcds.args +// Specify the command line for invoking org.apache.beam.sdk.tpcds.BeamTpcds task run(type: JavaExec) { def tpcdsArgsStr = project.findProperty(tpcdsArgsProperty) ?: "" def tpcdsArgsList = new ArrayList<String>() diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java index 43b97d2..69e676f 100644 --- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java +++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamSqlEnvRunner.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.tpcds; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + import com.alibaba.fastjson.JSONObject; import java.util.ArrayList; import java.util.Arrays; @@ -66,16 +68,13 @@ public class BeamSqlEnvRunner { private static final Logger LOG = LoggerFactory.getLogger(BeamSqlEnvRunner.class); private static String buildTableCreateStatement(String tableName) { - String createStatement = - "CREATE EXTERNAL TABLE " - + tableName - + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'"; - return createStatement; + return "CREATE EXTERNAL TABLE " + + tableName + + " (%s) TYPE text LOCATION '%s' TBLPROPERTIES '{\"format\":\"csv\", \"csvformat\": \"InformixUnload\"}'"; } private static String buildDataLocation(String dataSize, String tableName) { - String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat"; - return dataLocation; + return DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat"; } /** @@ -107,6 +106,7 @@ public class BeamSqlEnvRunner { String tableName = entry.getKey(); String dataLocation = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat"; Schema tableSchema = schemaMap.get(tableName); + checkArgumentNotNull(tableSchema, "Table schema can't be null for table: " + tableName); Table table = Table.builder() .name(tableName) diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java index 6b25f65..3361453 100644 --- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java +++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/BeamTpcds.java @@ -17,23 +17,7 @@ */ package org.apache.beam.sdk.tpcds; -/** - * To execute this main() method, run the following example command from the command line. - * - * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=3,26,55 \ - * --tpcParallel=2 \ --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging - * \ --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \ - * --maxNumWorkers=10" - * - * <p>To run query using ZetaSQL planner (currently query96 can be run using ZetaSQL), set the - * plannerName as below. If not specified, the default planner is Calcite. - * - * <p>./gradlew :sdks:java:testing:tpcds:run -Ptpcds.args="--dataSize=1G \ --queries=96 \ - * --tpcParallel=2 \ --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner \ - * --project=apache-beam-testing \ --stagingLocation=gs://beamsql_tpcds_1/staging \ - * --tempLocation=gs://beamsql_tpcds_2/temp \ --runner=DataflowRunner \ --region=us-west1 \ - * --maxNumWorkers=10" - */ +/** Main driver program to run TPC-DS benchmark. */ public class BeamTpcds { /** * The main method can choose to run either SqlTransformRunner.runUsingSqlTransform() or diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java index ca4cf63..7b00a37 100644 --- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java +++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/QueryReader.java @@ -17,14 +17,6 @@ */ package org.apache.beam.sdk.tpcds; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.Objects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources; @@ -33,12 +25,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources; * ';'), write the query as a string and return it. */ public class QueryReader { - public static String readQuery(String queryFileName) throws Exception { - String path = "queries/" + queryFileName + ".sql"; - String query = Resources.toString(Resources.getResource(path), Charsets.UTF_8); - return query; - } - /** * Reads a query file (.sql), return the query as a string. * @@ -47,38 +33,9 @@ public class QueryReader { * @return The query string stored in this file. * @throws Exception */ - public static String readQuery2(String queryFileName) throws Exception { - - // Prepare the file reader. - ClassLoader classLoader = QueryReader.class.getClassLoader(); - if (classLoader == null) { - throw new RuntimeException("Can't get classloader from QueryReader."); - } + public static String readQuery(String queryFileName) throws Exception { String path = "queries/" + queryFileName + ".sql"; - - URL resource = classLoader.getResource(path); - if (resource == null) { - throw new RuntimeException("Resource for " + path + " can't be null."); - } - String queryFilePath = Objects.requireNonNull(resource).getPath(); - File queryFile = new File(queryFilePath); - Reader fileReader = - new InputStreamReader(new FileInputStream(queryFile), StandardCharsets.UTF_8); - BufferedReader reader = new BufferedReader(fileReader); - - // Read the file into stringBuilder. - StringBuilder stringBuilder = new StringBuilder(); - String line; - String ls = System.getProperty("line.separator"); - while ((line = reader.readLine()) != null) { - stringBuilder.append(line); - stringBuilder.append(ls); - } - - // Delete the last new line separator. - stringBuilder.deleteCharAt(stringBuilder.length() - 1); - reader.close(); - - return stringBuilder.toString(); + String query = Resources.toString(Resources.getResource(path), Charsets.UTF_8); + return query; } } diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java index 34274f9..4f56c1a 100644 --- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java +++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java @@ -49,8 +49,6 @@ import org.slf4j.LoggerFactory; * queries. */ public class SqlTransformRunner { - private static final String DATA_DIRECTORY = "gs://beamsql_tpcds_1/data"; - private static final String RESULT_DIRECTORY = "gs://beamsql_tpcds_1/tpcds_results"; private static final String SUMMARY_START = "\n" + "TPC-DS Query Execution Summary:"; private static final List<String> SUMMARY_HEADERS_LIST = Arrays.asList( @@ -86,14 +84,13 @@ public class SqlTransformRunner { PCollectionTuple tables = PCollectionTuple.empty(pipeline); for (Map.Entry<String, Schema> tableSchema : schemaMap.entrySet()) { String tableName = tableSchema.getKey(); - System.out.println("tableName = " + tableName); // Only when queryString contains tableName, the table is relevant to this query and will be // added. This can avoid reading unnecessary data files. if (queryString.contains(tableName)) { // This is location path where the data are stored - String filePattern = DATA_DIRECTORY + "/" + dataSize + "/" + tableName + ".dat"; - System.out.println("filePattern = " + filePattern); + String filePattern = + tpcdsOptions.getDataDirectory() + "/" + dataSize + "/" + tableName + ".dat"; PCollection<Row> table = new TextTable( @@ -196,7 +193,7 @@ public class SqlTransformRunner { .apply( TextIO.write() .to( - RESULT_DIRECTORY + tpcdsOptions.getResultsDirectory() + "/" + dataSize + "/" diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java index 3a2371d..b6f8733 100644 --- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java +++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoader.java @@ -17,16 +17,15 @@ */ package org.apache.beam.sdk.tpcds; -import java.io.File; -import java.net.URL; +import java.io.IOException; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; -import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.FileNameUtils; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.ClassPath; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -36,12 +35,6 @@ import org.json.simple.parser.JSONParser; * table's schema into a string. */ public class TableSchemaJSONLoader { - public static String readQuery(String tableName) throws Exception { - String path = "schemas/" + tableName + ".json"; - String fixture = Resources.toString(Resources.getResource(path), Charsets.UTF_8); - return fixture; - } - /** * Read a table schema json file from resource/schemas directory, parse the file into a string * which can be utilized by BeamSqlEnv.executeDdl method. @@ -55,7 +48,6 @@ public class TableSchemaJSONLoader { public static String parseTableSchema(String tableName) throws Exception { String path = "schemas/" + tableName + ".json"; String schema = Resources.toString(Resources.getResource(path), Charsets.UTF_8); - System.out.println("schema = " + schema); JSONObject jsonObject = (JSONObject) new JSONParser().parse(schema); JSONArray jsonArray = (JSONArray) jsonObject.get("schema"); @@ -67,11 +59,11 @@ public class TableSchemaJSONLoader { StringBuilder schemaStringBuilder = new StringBuilder(); Iterator jsonArrIterator = jsonArray.iterator(); - Iterator<Map.Entry> recordIterator; + Iterator recordIterator; while (jsonArrIterator.hasNext()) { recordIterator = ((Map) jsonArrIterator.next()).entrySet().iterator(); while (recordIterator.hasNext()) { - Map.Entry pair = recordIterator.next(); + Map.Entry pair = (Map.Entry) recordIterator.next(); if (pair.getKey().equals("type")) { // If the key of the pair is "type", make some modification before appending it to the @@ -105,9 +97,7 @@ public class TableSchemaJSONLoader { schemaStringBuilder.deleteCharAt(schemaStringBuilder.length() - 1); } - String schemaString = schemaStringBuilder.toString(); - - return schemaString; + return schemaStringBuilder.toString(); } /** @@ -116,25 +106,20 @@ public class TableSchemaJSONLoader { * * @return The list of names of all tables. */ - public static List<String> getAllTableNames() { + public static List<String> getAllTableNames() throws IOException, URISyntaxException { ClassLoader classLoader = TableSchemaJSONLoader.class.getClassLoader(); if (classLoader == null) { throw new RuntimeException("Can't get classloader from TableSchemaJSONLoader."); } - URL resource = classLoader.getResource("schemas"); - if (resource == null) { - throw new RuntimeException("Resource for \"schemas\" can't be null."); - } - String tableDirPath = Objects.requireNonNull(resource).getPath(); - File tableDir = new File(tableDirPath); - File[] tableDirListing = tableDir.listFiles(); + ClassPath classPath = ClassPath.from(classLoader); List<String> tableNames = new ArrayList<>(); - - if (tableDirListing != null) { - for (File file : tableDirListing) { - // Remove the .json extension in file name - tableNames.add(FileNameUtils.getBaseName((file.getName()))); + for (ClassPath.ResourceInfo resourceInfo : classPath.getResources()) { + String resourceName = resourceInfo.getResourceName(); + if (resourceName.startsWith("schemas/")) { + String tableName = + resourceName.substring("schemas/".length(), resourceName.length() - ".json".length()); + tableNames.add(tableName); } } diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java index c693dfd..8e8b3e6 100644 --- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java +++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsOptions.java @@ -20,11 +20,13 @@ package org.apache.beam.sdk.tpcds; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.Validation; /** Options used to configure TPC-DS test. */ public interface TpcdsOptions extends PipelineOptions { @Description( "The size of TPC-DS data to run query on, user input should contain the unit, such as '1G', '10G'") + @Validation.Required String getDataSize(); void setDataSize(String dataSize); @@ -41,4 +43,16 @@ public interface TpcdsOptions extends PipelineOptions { Integer getTpcParallel(); void setTpcParallel(Integer parallelism); + + @Description("The path to input data directory") + @Validation.Required + String getDataDirectory(); + + void setDataDirectory(String path); + + @Description("The path to directory with results") + @Validation.Required + String getResultsDirectory(); + + void setResultsDirectory(String path); } diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java index 1d597f0..651d6f0 100644 --- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java +++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java @@ -19,6 +19,8 @@ package org.apache.beam.sdk.tpcds; import static org.junit.Assert.assertEquals; +import java.io.IOException; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -138,7 +140,7 @@ public class TableSchemaJSONLoaderTest { } @Test - public void testGetAllTableNames() { + public void testGetAllTableNames() throws IOException, URISyntaxException { List<String> tableNames = TableSchemaJSONLoader.getAllTableNames(); Collections.sort(tableNames); List<String> expectedTableNames =