This is an automated email from the ASF dual-hosted git repository. johnyangk pushed a commit to branch tpch-fix in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 5ff7062aee00d5a8ec74935e2d4330da71398127 Author: John Yang <[email protected]> AuthorDate: Thu Sep 13 10:25:36 2018 +0900 load only the used tables --- .../org/apache/nemo/examples/beam/tpch/Tpch.java | 159 ++++----------------- 1 file changed, 28 insertions(+), 131 deletions(-) diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java index 16a9297..bf1393c 100644 --- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java @@ -38,10 +38,7 @@ import java.io.IOException; import java.io.Serializable; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -54,115 +51,6 @@ import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beam public final class Tpch { private static final Logger LOG = LoggerFactory.getLogger(Tpch.class.getName()); - public static final String QUERY1 = - "select\n" - + "\tl_returnflag,\n" - + "\tl_linestatus,\n" - + "\tsum(l_quantity) as sum_qty,\n" - + "\tsum(l_extendedprice) as sum_base_price,\n" - + "\tsum(l_extendedprice * (1 - l_discount)) as sum_disc_price,\n" - + "\tsum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,\n" - + "\tavg(l_quantity) as avg_qty,\n" - + "\tavg(l_extendedprice) as avg_price,\n" - + "\tavg(l_discount) as avg_disc,\n" - + "\tcount(*) as count_order\n" - + "from\n" - + "\tlineitem\n" - + "where\n" - + "\tl_shipdate <= date '1998-12-01' - interval '90' day (3)\n" - + "group by\n" - + "\tl_returnflag,\n" - + "\tl_linestatus\n" - + "order by\n" - + "\tl_returnflag,\n" - + "\tl_linestatus limit 10"; - - public static final String QUERY3 = - "select\n" - + "\tl_orderkey,\n" - + "\tsum(l_extendedprice * (1 - l_discount)) as revenue,\n" - + "\to_orderdate,\n" - + "\to_shippriority\n" - + "from\n" - + "\tcustomer,\n" - + "\torders,\n" - + "\tlineitem\n" - + "where\n" - + "\tc_mktsegment = 'BUILDING'\n" - + "\tand c_custkey = o_custkey\n" - + "\tand l_orderkey = o_orderkey\n" - + "\tand o_orderdate < date '1995-03-15'\n" - + "\tand l_shipdate > date '1995-03-15'\n" - + "group by\n" - + "\tl_orderkey,\n" - + "\to_orderdate,\n" - + "\to_shippriority\n" - + "order by\n" - + "\trevenue desc,\n" - + "\to_orderdate\n" - + "limit 10"; - - public static final String QUERY4 = - "select\n" - + "\to_orderpriority,\n" - + "\tcount(*) as order_count\n" - + "from\n" - + "\torders\n" - + "where\n" - + "\to_orderdate >= date '1993-07-01'\n" - + "\tand o_orderdate < date '1993-07-01' + interval '3' month\n" - + "\tand exists (\n" - + "\t\tselect\n" - + "\t\t\t*\n" - + "\t\tfrom\n" - + "\t\t\tlineitem\n" - + "\t\twhere\n" - + "\t\t\tl_orderkey = o_orderkey\n" - + "\t\t\tand l_commitdate < l_receiptdate\n" - + "\t)\n" - + "group by\n" - + "\to_orderpriority\n" - + "order by\n" - + "\to_orderpriority limit 10"; - - public static final String QUERY5 = - "select\n" - + "\tn_name,\n" - + "\tsum(l_extendedprice * (1 - l_discount)) as revenue\n" - + "from\n" - + "\tcustomer,\n" - + "\torders,\n" - + "\tlineitem,\n" - + "\tsupplier,\n" - + "\tnation,\n" - + "\tregion\n" - + "where\n" - + "\tc_custkey = o_custkey\n" - + "\tand l_orderkey = o_orderkey\n" - + "\tand l_suppkey = s_suppkey\n" - + "\tand c_nationkey = s_nationkey\n" - + "\tand s_nationkey = n_nationkey\n" - + "\tand n_regionkey = r_regionkey\n" - + "\tand r_name = 'ASIA'\n" - + "\tand o_orderdate >= date '1994-01-01'\n" - + "\tand o_orderdate < date '1994-01-01' + interval '1' year\n" - + "group by\n" - + "\tn_name\n" - + "order by\n" - + "\trevenue desc limit 10"; - - public static final String QUERY6 = - "select\n" - + "\tsum(l_extendedprice * l_discount) as revenue\n" - + "from\n" - + "\tlineitem\n" - + "where\n" - + "\tl_shipdate >= date '1994-01-01'\n" - + "\tand l_shipdate < date '1994-01-01' + interval '1' year\n" - + "\tand l_discount between .06 - 0.01 and .06 + 0.01\n" - + "\tand l_quantity < 24 limit 10"; - - /** * Private Constructor. */ @@ -194,7 +82,8 @@ public final class Tpch { private static PCollectionTuple getHTables(final Pipeline pipeline, final CSVFormat csvFormat, - final String inputDirectory) { + final String inputDirectory, + final String query) { final ImmutableMap<String, Schema> hSchemas = ImmutableMap.<String, Schema>builder() .put("lineitem", Schemas.LINEITEM_SCHEMA) .put("customer", Schemas.CUSTOMER_SCHEMA) @@ -220,14 +109,28 @@ public final class Tpch { PCollectionTuple tables = PCollectionTuple.empty(pipeline); for (final Map.Entry<String, Schema> tableSchema : hSchemas.entrySet()) { - final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl"; - final PCollection<Row> table = GenericSourceSink.read(pipeline, filePattern) - .apply("StringToRow", new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat)) - .setCoder(tableSchema.getValue().getRowCoder()) - .setName(tableSchema.getKey()); - tables = tables.and(new TupleTag<>(tableSchema.getKey()), table); - - LOG.info("FilePattern {} / Tables {}", filePattern, tables); + final String tableName = tableSchema.getKey(); + + final List<String> tokens = Arrays.asList(query.split(" ")); + LOG.info("Tokens are {}", tokens); + + if (tokens.contains(tableName)) { + LOG.info("HIT: tablename {}", tableName); + + final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl"; + final PCollection<Row> table = GenericSourceSink.read(pipeline, filePattern) + .apply("StringToRow", new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat)) + .setCoder(tableSchema.getValue().getRowCoder()) + .setName(tableSchema.getKey()); + tables = tables.and(new TupleTag<>(tableSchema.getKey()), table); + + LOG.info("FilePattern {} / Tables {}", filePattern, tables); + } + + + + + } return tables; } @@ -241,13 +144,6 @@ public final class Tpch { final String inputDirectory = args[1]; final String outputFilePath = args[2]; - final Map<Integer, String> idToQuery = new HashMap<>(); - idToQuery.put(1, QUERY1); - idToQuery.put(3, QUERY3); - idToQuery.put(4, QUERY4); - idToQuery.put(5, QUERY5); - idToQuery.put(6, QUERY6); - LOG.info("{} / {} / {}", queryFilePath, inputDirectory, outputFilePath); final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class); @@ -255,15 +151,16 @@ public final class Tpch { options.setJobName("TPC-H"); final Pipeline p = Pipeline.create(options); + final String queryString = getQueryString(queryFilePath); // Create tables final CSVFormat csvFormat = CSVFormat.MYSQL .withDelimiter('|') .withNullString("") .withTrailingDelimiter(); - final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory); + final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory, queryString); // Run the TPC-H query - final PCollection<Row> result = tables.apply(SqlTransform.query(getQueryString(queryFilePath))); + final PCollection<Row> result = tables.apply(SqlTransform.query(queryString)); final PCollection<String> resultToWrite = result.apply(MapElements.into(TypeDescriptors.strings()).via( new SerializableFunction<Row, String>() {
