StephanEwen commented on a change in pull request #10239: [Flink-11491][Test] 
Support all TPC-DS queries
URL: https://github.com/apache/flink/pull/10239#discussion_r349033380
 
 

 ##########
 File path: 
flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java
 ##########
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.tpcds;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.ConnectorCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.sinks.CsvTableSink;
+import org.apache.flink.table.sources.CsvTableSource;
+import org.apache.flink.table.tpcds.schema.TpcdsSchema;
+import org.apache.flink.table.tpcds.schema.TpcdsSchemaProvider;
+import org.apache.flink.table.tpcds.stats.TpcdsStatsProvider;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * End-to-end test for TPC-DS.
+ */
+public class TpcdsTestProgram {
+
+       private static final List<String> TCPDS_TABLES = Arrays.asList(
+               "catalog_sales", "catalog_returns", "inventory", "store_sales",
+               "store_returns", "web_sales", "web_returns", "call_center", 
"catalog_page",
+               "customer", "customer_address", "customer_demographics", 
"date_dim",
+               "household_demographics", "income_band", "item", "promotion", 
"reason",
+               "ship_mode", "store", "time_dim", "warehouse", "web_page", 
"web_site"
+       );
+       private static final List<String> TPCDS_QUERIES = Arrays.asList(
+               "1", "2", "3", "4", "5", "6", "7", "8", "9", "10",
+               "11", "12", "13", "14a", "14b", "15", "16", "17", "18", "19", 
"20",
+               "21", "22", "23a", "23b", "24a", "24b", "25", "26", "27", "28", 
"29", "30",
+               "31", "32", "33", "34", "35", "36", "37", "38", "39a", "39b", 
"40",
+               "41", "42", "43", "44", "45", "46", "47", "48", "49", "50",
+               "51", "52", "53", "54", "55", "56", "57", "58", "59", "60",
+               "61", "62", "63", "64", "65", "66", "67", "68", "69", "70",
+               "71", "72", "73", "74", "75", "76", "77", "78", "79", "80",
+               "81", "82", "83", "84", "85", "86", "87", "88", "89", "90",
+               "91", "92", "93", "94", "95", "96", "97", "98", "99"
+       );
+
+       private static final String QUERY_PREFIX = "query";
+       private static final String QUERY_SUFFIX = ".sql";
+       private static final String DATA_SUFFIX = ".dat";
+       private static final String RESULT_SUFFIX = ".ans";
+       private static final String COL_DELIMITER = "|";
+       private static final String FILE_SEPARATOR = "/";
+
+       public static void main(String[] args) throws Exception {
+               ParameterTool params = ParameterTool.fromArgs(args);
+               String sourceTablePath = params.getRequired("sourceTablePath");
+               String queryPath = params.getRequired("queryPath");
+               String sinkTablePath = params.getRequired("sinkTablePath");
+               Boolean useTableStats = params.getBoolean("useTableStats");
+
+               //execute TPC-DS queries
+               for (String queryId : TPCDS_QUERIES) {
+                       System.out.println("[INFO]Run TPC-DS query " + queryId 
+ " ...");
+                       TableEnvironment tableEnvironment = 
prepareTableEnv(sourceTablePath, useTableStats);
+                       String queryName = QUERY_PREFIX + queryId + 
QUERY_SUFFIX;
+                       String queryFilePath = queryPath + FILE_SEPARATOR + 
queryName;
+                       String queryString = loadFile2String(queryFilePath);
+                       Table resultTable = 
tableEnvironment.sqlQuery(queryString);
+
+                       //register sink table
+                       String sinkTableName = QUERY_PREFIX + queryId + 
"_sinkTable";
+                       tableEnvironment.registerTableSink(sinkTableName,
+                               new CsvTableSink(
+                                       sinkTablePath + FILE_SEPARATOR + 
queryId + RESULT_SUFFIX,
+                                       COL_DELIMITER,
+                                       1,
+                                       FileSystem.WriteMode.OVERWRITE)
+                                       .configure(
+                                               
resultTable.getSchema().getFieldNames(),
+                                               
Arrays.stream(resultTable.getSchema().getFieldDataTypes())
+                                                       .map(r -> 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(r))
+                                                       
.collect(Collectors.toList())
+                                                       .toArray(new 
TypeInformation[0])
+                                       ));
+                       tableEnvironment.insertInto(resultTable, sinkTableName);
+                       tableEnvironment.execute(queryName);
+                       System.out.println("[INFO]Run TPC-DS query " + queryId 
+ " success.");
+               }
+       }
+
+       /**
+        * Prepare TableEnvironment for query.
+        *
+        * @param sourceTablePath
+        * @return
+        */
+       private static TableEnvironment prepareTableEnv(String sourceTablePath, 
Boolean useTableStats) throws Exception {
+               //init Table Env
+               EnvironmentSettings environmentSettings = EnvironmentSettings
+                       .newInstance()
+                       .useBlinkPlanner()
+                       .inBatchMode()
+                       .build();
+               TableEnvironment tEnv = 
TableEnvironment.create(environmentSettings);
+
+               //config Optimizer parameters
+               tEnv.getConfig().getConfiguration()
+                       
.setString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, 
ShuffleMode.BATCH.toString());
+               tEnv.getConfig().getConfiguration()
+                       
.setString(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY, "32 mb");
+               tEnv.getConfig().getConfiguration()
+                       
.setString(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY, "32 
mb");
+               tEnv.getConfig().getConfiguration()
+                       
.setString(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY, "32 mb");
+               tEnv.getConfig().getConfiguration()
+                       
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+               tEnv.getConfig().getConfiguration()
+                       
.setLong(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, 10 * 
1024 * 1024);
+               tEnv.getConfig().getConfiguration()
+                       
.setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED, true);
+
+               //register TPC-DS tables
+               TCPDS_TABLES.forEach(table -> {
+                       TpcdsSchema schema = 
TpcdsSchemaProvider.getTableSchema(table);
+                       CsvTableSource.Builder builder = 
CsvTableSource.builder();
+                       builder.path(sourceTablePath + FILE_SEPARATOR + table + 
DATA_SUFFIX);
+                       for (int i = 0; i < schema.getFieldNames().size(); i++) 
{
+                               builder.field(
+                                       schema.getFieldNames().get(i),
+                                       
TypeConversions.fromDataTypeToLegacyInfo(schema.getFieldTypes().get(i)));
+                       }
+                       builder.fieldDelimiter(COL_DELIMITER);
+                       builder.lineDelimiter("\n");
+                       CsvTableSource tableSource = builder.build();
+                       ConnectorCatalogTable catalogTable = 
ConnectorCatalogTable.source(tableSource, true);
+                       try {
+                               tEnv.getCatalog(tEnv.getCurrentCatalog()).get()
+                                       .createTable(new 
ObjectPath(tEnv.getCurrentDatabase(), table), catalogTable, false);
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+               });
+               // register statistics info
+               if (useTableStats) {
+                       TpcdsStatsProvider.registerTpcdsStats(tEnv);
+               }
+               return tEnv;
+       }
+
+       private static String loadFile2String(String filePath) {
+               StringBuilder stringBuilder = new StringBuilder();
+               try {
+                       Stream<String> stream = 
Files.lines(Paths.get(filePath), StandardCharsets.UTF_8);
+                       stream.forEach(s -> 
stringBuilder.append(s).append("\n"));
 
 Review comment:
   Nit: you can append a char here, not a string. Slightly more lightweight).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to