This is an automated email from the ASF dual-hosted git repository. johnyangk pushed a commit to branch tpch-fix in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 10470a09793bf8b999a3cf129b55d8fbf141f176 Author: John Yang <[email protected]> AuthorDate: Sun Sep 9 10:34:15 2018 +0900 tpch --- .../apache/nemo/examples/beam/tpch/Schemas.java | 353 +++++++++++++++++++++ .../org/apache/nemo/examples/beam/tpch/Tpch.java | 270 ++++++++++++++++ 2 files changed, 623 insertions(+) diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java new file mode 100644 index 0000000..b464789 --- /dev/null +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nemo.examples.beam.tpch; + +import com.google.common.collect.ImmutableMap; +import org.apache.beam.sdk.schemas.Schema; + +/** + * A simple SQL application. + * (Copied/Refined from the example code in the Beam repository) + */ +public final class Schemas { + // private static final JavaTypeFactory TYPE_FACTORY = + // new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + // private final ImmutableMap<String, TpcdsTable> tableHMap; + private final ImmutableMap<String, String> columnPrefixes; + public static Schema storeSalesSchema = + Schema.builder() + .addNullableField("ss_sold_date_sk", Schema.FieldType.INT32) + .addNullableField("ss_sold_time_sk", Schema.FieldType.INT32) + .addNullableField("ss_item_sk", Schema.FieldType.INT32) + .addNullableField("ss_customer_sk", Schema.FieldType.STRING) + .addNullableField("ss_cdemo_sk", Schema.FieldType.INT32) + .addNullableField("ss_hdemo_sk", Schema.FieldType.INT32) + .addNullableField("ss_addr_sk", Schema.FieldType.INT32) + .addNullableField("ss_store_sk", Schema.FieldType.INT32) + .addNullableField("ss_promo_sk", Schema.FieldType.INT32) + .addNullableField("ss_ticket_number", Schema.FieldType.INT64) + .addNullableField("ss_quantity", Schema.FieldType.INT32) + .addNullableField("ss_wholesale_cost", Schema.FieldType.FLOAT) + .addNullableField("ss_list_price", Schema.FieldType.FLOAT) + .addNullableField("ss_sales_price", Schema.FieldType.FLOAT) + .addNullableField("ss_ext_discount_amt", Schema.FieldType.FLOAT) + .addNullableField("ss_ext_sales_price", Schema.FieldType.FLOAT) + .addNullableField("ss_ext_wholesale_cost", Schema.FieldType.FLOAT) + .addNullableField("ss_ext_list_price", Schema.FieldType.FLOAT) + .addNullableField("ss_ext_tax", Schema.FieldType.FLOAT) + .addNullableField("ss_coupon_amt", Schema.FieldType.FLOAT) + .addNullableField("ss_net_paid", Schema.FieldType.FLOAT) + .addNullableField("ss_net_paid_inc_tax", Schema.FieldType.FLOAT) + .addNullableField("ss_net_profit", Schema.FieldType.FLOAT) + .build(); + public static Schema dateDimSchema = + Schema.builder() + .addNullableField("d_date_sk", Schema.FieldType.INT32) + .addNullableField("d_date_id", Schema.FieldType.STRING) + .addNullableField("d_date", Schema.FieldType.STRING) + .addNullableField("d_month_seq", Schema.FieldType.INT32) + .addNullableField("d_week_seq", Schema.FieldType.INT32) + .addNullableField("d_quarter_seq", Schema.FieldType.INT32) + .addNullableField("d_year", Schema.FieldType.INT32) + .addNullableField("d_dow", Schema.FieldType.INT32) + .addNullableField("d_moy", Schema.FieldType.INT32) + .addNullableField("d_dom", Schema.FieldType.INT32) + .addNullableField("d_qoy", Schema.FieldType.INT32) + .addNullableField("d_fy_year", Schema.FieldType.INT32) + .addNullableField("d_fy_quarter_seq", Schema.FieldType.INT32) + .addNullableField("d_fy_week_seq", Schema.FieldType.INT32) + .addNullableField("d_day_name", Schema.FieldType.STRING) + .addNullableField("d_quarter_name", Schema.FieldType.STRING) + .addNullableField("d_holiday", Schema.FieldType.STRING) + .addNullableField("d_weekend", Schema.FieldType.STRING) + .addNullableField("d_following_holiday", Schema.FieldType.STRING) + .addNullableField("d_first_dom", Schema.FieldType.INT32) + .addNullableField("d_last_dom", Schema.FieldType.INT32) + .addNullableField("d_same_day_ly", Schema.FieldType.INT32) + .addNullableField("d_same_day_lq", Schema.FieldType.INT32) + .addNullableField("d_current_day", Schema.FieldType.STRING) + .addNullableField("d_current_week", Schema.FieldType.STRING) + .addNullableField("d_current_month", Schema.FieldType.STRING) + .addNullableField("d_current_quarter", Schema.FieldType.STRING) + .addNullableField("d_current_year", Schema.FieldType.STRING) + .build(); + public static Schema itemSchema = + Schema.builder() + .addNullableField("i_item_sk", Schema.FieldType.INT32) + .addNullableField("i_item_id", Schema.FieldType.STRING) // .string, + .addNullableField("i_rec_start_date", Schema.FieldType.DATETIME) // .date, + .addNullableField("i_rec_end_date", Schema.FieldType.DATETIME) // .date, + .addNullableField("i_item_desc", Schema.FieldType.STRING) // .string, + .addNullableField("i_current_price", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField( + "i_wholesale_cost", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("i_brand_id", Schema.FieldType.INT32) // .int, + .addNullableField("i_brand", Schema.FieldType.STRING) // .string, + .addNullableField("i_class_id", Schema.FieldType.INT32) // .int, + .addNullableField("i_class", Schema.FieldType.STRING) // .string, + .addNullableField("i_category_id", Schema.FieldType.INT32) // .int, + .addNullableField("i_category", Schema.FieldType.STRING) // .string, + .addNullableField("i_manufact_id", Schema.FieldType.INT32) // .int, + .addNullableField("i_manufact", Schema.FieldType.STRING) // .string, + .addNullableField("i_size", Schema.FieldType.STRING) // .string, + .addNullableField("i_formulation", Schema.FieldType.STRING) // .string, + .addNullableField("i_color", Schema.FieldType.STRING) // .string, + .addNullableField("i_units", Schema.FieldType.STRING) // .string, + .addNullableField("i_container", Schema.FieldType.STRING) // .string, + .addNullableField("i_manager_id", Schema.FieldType.INT32) // .int, + .addNullableField("i_product_name", Schema.FieldType.STRING) // .string), + .build(); + public static Schema inventorySchema = + Schema.builder() + .addNullableField("inv_date_sk", Schema.FieldType.INT32) + .addNullableField("inv_item_sk", Schema.FieldType.INT32) + .addNullableField("inv_warehouse_sk", Schema.FieldType.INT32) + .addNullableField("inv_quantity_on_hand", Schema.FieldType.INT32) + .build(); + public static Schema catalogSalesSchema = + Schema.builder() + .addNullableField("cs_sold_date_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_sold_time_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_ship_date_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_bill_customer_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_bill_cdemo_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_bill_hdemo_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_bill_addr_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_ship_customer_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_ship_cdemo_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_ship_hdemo_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_ship_addr_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_call_center_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_catalog_page_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_ship_mode_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_warehouse_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_item_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_promo_sk", Schema.FieldType.INT32) // .int, + .addNullableField("cs_order_number", Schema.FieldType.INT64) // .long, + .addNullableField("cs_quantity", Schema.FieldType.INT32) // .int, + .addNullableField("cs_wholesale_cost", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_list_price", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_sales_price", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_ext_discount_amt", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_ext_sales_price", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_ext_wholesale_cost", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_ext_list_price", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_ext_tax", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_coupon_amt", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_ext_ship_cost", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_net_paid", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_net_paid_inc_tax", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_net_paid_inc_ship", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_net_paid_inc_ship_tax", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("cs_net_profit", Schema.FieldType.FLOAT) // .decimal(7,2)) + .build(); + public static final Schema ORDER_SCHEMA = + Schema.builder() + .addInt32Field("o_orderkey") + .addInt32Field("o_custkey") + .addStringField("o_orderstatus") + .addFloatField("o_totalprice") + .addStringField("o_orderdate") + .addStringField("o_orderpriority") + .addStringField("o_clerk") + .addInt32Field("o_shippriority") + .addStringField("o_comment") + .build(); + public static final Schema CUSTOMER_SCHEMA = + Schema.builder() + .addInt32Field("c_custkey") + .addStringField("c_name") + .addStringField("c_address") + .addInt32Field("c_nationkey") + .addStringField("c_phone") + .addFloatField("c_acctbal") + .addStringField("c_mktsegment") + .addStringField("c_comment") + .build(); + public static Schema getCustomerDsSchema = + Schema.builder() + .addNullableField("c_customer_sk", Schema.FieldType.INT32) // .int, + .addNullableField("c_customer_id", Schema.FieldType.STRING) // .string, + .addNullableField("c_current_cdemo_sk", Schema.FieldType.INT32) // .int, + .addNullableField("c_current_hdemo_sk", Schema.FieldType.INT32) // .int, + .addNullableField("c_current_addr_sk", Schema.FieldType.INT32) // .int, + .addNullableField("c_first_shipto_date_sk", Schema.FieldType.INT32) // .int, + .addNullableField("c_first_sales_date_sk", Schema.FieldType.INT32) // .int, + .addNullableField("c_salutation", Schema.FieldType.STRING) // .string, + .addNullableField("c_first_name", Schema.FieldType.STRING) // .string, + .addNullableField("c_last_name", Schema.FieldType.STRING) // .string, + .addNullableField("c_preferred_cust_flag", Schema.FieldType.STRING) // .string, + .addNullableField("c_birth_day", Schema.FieldType.INT32) // .int, + .addNullableField("c_birth_month", Schema.FieldType.INT32) // .int, + .addNullableField("c_birth_year", Schema.FieldType.INT32) // .int, + .addNullableField("c_birth_country", Schema.FieldType.STRING) // .string, + .addNullableField("c_login", Schema.FieldType.STRING) // .string, + .addNullableField("c_email_address", Schema.FieldType.STRING) // .string, + .addNullableField("c_last_review_date", Schema.FieldType.STRING) // .string) + .build(); + public static final Schema LINEITEM_SCHEMA = + Schema.builder() + .addInt32Field("l_orderkey") + .addInt32Field("l_partkey") + .addInt32Field("l_suppkey") + .addInt32Field("l_linenumber") + .addFloatField("l_quantity") + .addFloatField("l_extendedprice") + .addFloatField("l_discount") + .addFloatField("l_tax") + .addStringField("l_returnflag") + .addStringField("l_linestatus") + .addStringField("l_shipdate") + .addStringField("l_commitdate") + .addStringField("l_receiptdate") + .addStringField("l_shipinstruct") + .addStringField("l_shipmode") + .addStringField("l_comment") + .build(); + public static final Schema PARTSUPP_SCHEMA = + Schema.builder() + .addInt32Field("ps_partkey") // identifier + .addInt32Field("ps_suppkey") // identifier + .addInt32Field("ps_availqty") // integer + .addFloatField("ps_supplycost") // decimal + .addStringField("ps_comment") // variable text, size 199 + .build(); + public static final Schema REGION_SCHEMA = + Schema.builder() + .addInt32Field("r_regionkey") // identifier + .addStringField("r_name") // fixed text, size 25 + .addStringField("r_comment") // variable text, size 152 + .build(); + public static final Schema SUPPLIER_SCHEMA = + Schema.builder() + .addInt32Field("s_suppkey") // identifier + .addStringField("s_name") // fixed text, size 25 + .addStringField("s_address") // variable text, size 40 + .addInt32Field("s_nationkey") // identifier + .addStringField("s_phone") // fixed text, size 15 + .addFloatField("s_acctbal") // decimal + .addStringField("s_comment") // variable text, size 101 + .build(); + public static final Schema PART_SCHEMA = + Schema.builder() + .addInt32Field("p_partkey") + .addStringField("p_name") + .addStringField("p_mfgr") + .addStringField("p_brand") + .addStringField("p_type") + .addInt32Field("p_size") + .addStringField("p_container") + .addFloatField("p_retailprice") + .addStringField("p_comment") + .build(); + public static final Schema NATION_SCHEMA = + Schema.builder() + .addInt32Field("n_nationkey") + .addStringField("n_name") + .addInt32Field("n_regionkey") + .addStringField("n_comment") + .build(); + public static Schema promotionSchema = + Schema.builder() + .addNullableField("p_promo_sk", Schema.FieldType.INT32) + .addNullableField("p_promo_id", Schema.FieldType.STRING) // .string, + .addNullableField("p_start_date_sk", Schema.FieldType.INT32) // .int, + .addNullableField("p_end_date_sk", Schema.FieldType.INT32) // .int, + .addNullableField("p_item_sk", Schema.FieldType.INT32) // .int, + .addNullableField("p_cost", Schema.FieldType.FLOAT) // .decimal(15,2), + .addNullableField("p_response_target", Schema.FieldType.INT32) // .int, + .addNullableField("p_promo_name", Schema.FieldType.STRING) // .string, + .addNullableField("p_channel_dmail", Schema.FieldType.STRING) // .string, + .addNullableField("p_channel_email", Schema.FieldType.STRING) // .string, + .addNullableField("p_channel_catalog", Schema.FieldType.STRING) // .string, + .addNullableField("p_channel_tv", Schema.FieldType.STRING) // .string, + .addNullableField("p_channel_radio", Schema.FieldType.STRING) // .string, + .addNullableField("p_channel_press", Schema.FieldType.STRING) // .string, + .addNullableField("p_channel_event", Schema.FieldType.STRING) // .string, + .addNullableField("p_channel_demo", Schema.FieldType.STRING) // .string, + .addNullableField("p_channel_details", Schema.FieldType.STRING) // .string, + .addNullableField("p_purpose", Schema.FieldType.STRING) // .string, + .addNullableField("p_discount_active", Schema.FieldType.STRING) // .string), + .build(); + public static Schema customerDemographicsSchema = + Schema.builder() + .addNullableField("cd_demo_sk", Schema.FieldType.INT32) + .addNullableField("cd_gender", Schema.FieldType.STRING) // .string, + .addNullableField("cd_marital_status", Schema.FieldType.STRING) // .string, + .addNullableField("cd_education_status", Schema.FieldType.STRING) // .string, + .addNullableField("cd_purchase_estimate", Schema.FieldType.INT32) // .int, + .addNullableField("cd_credit_rating", Schema.FieldType.STRING) // .string, + .addNullableField("cd_dep_count", Schema.FieldType.INT32) // .int, + .addNullableField("cd_dep_employed_count", Schema.FieldType.INT32) // .int, + .addNullableField("cd_dep_college_count", Schema.FieldType.INT32) // .int), + .build(); + public static Schema webSalesSchema = + Schema.builder() + .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_ship_date_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_item_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_bill_customer_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_bill_cdemo_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_bill_hdemo_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_bill_addr_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_ship_customer_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_ship_cdemo_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_ship_hdemo_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_ship_addr_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_web_page_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_web_site_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_ship_mode_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_warehouse_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_promo_sk", Schema.FieldType.INT32) // .int, + .addNullableField("ws_order_number", Schema.FieldType.INT64) // .long, + .addNullableField("ws_quantity", Schema.FieldType.INT32) // .int, + .addNullableField("ws_wholesale_cost", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_list_price", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_sales_price", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_ext_discount_amt", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_ext_sales_price", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_ext_wholesale_cost", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_ext_list_price", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_ext_tax", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_coupon_amt", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_ext_ship_cost", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_net_paid", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_net_paid_inc_tax", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_net_paid_inc_ship", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_net_paid_inc_ship_tax", Schema.FieldType.FLOAT) // .decimal(7,2), + .addNullableField("ws_net_profit", Schema.FieldType.FLOAT) // .decimal(7,2)), + .build(); + public Schemas() { + // final ImmutableMap.Builder<String, TpcdsTable> builder = ImmutableMap.builder(); + // for (TpcdsTable<?> tpcdsTable : TpcdsTable.getTables()) { + // builder.put(tpcdsTable.getTableName(), tpcdsTable); + // } + // this.tableHMap = builder.build(); + this.columnPrefixes = + ImmutableMap.<String, String>builder() + .put("lineitem", "l_") + .put("customer", "c_") + .put("supplier", "s_") + .put("partsupp", "ps_") + .put("part", "p_") + .put("orders", "o_") + .put("nation", "n_") + .put("region", "r_") + .build(); + } +} diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java new file mode 100644 index 0000000..4ebf0f3 --- /dev/null +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nemo.examples.beam.tpch; + +import com.google.common.collect.ImmutableMap; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.SqlTransform; +import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTable; +import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.*; +import org.apache.commons.csv.CSVFormat; +import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions; +import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine; + +/** + * A simple SQL application. + * (Copied/Refined from the example code in the Beam repository) + */ +public final class Tpch { + private static final Logger LOG = LoggerFactory.getLogger(Tpch.class.getName()); + + public static final String QUERY1 = + "select\n" + + "\tl_returnflag,\n" + + "\tl_linestatus,\n" + + "\tsum(l_quantity) as sum_qty,\n" + + "\tsum(l_extendedprice) as sum_base_price,\n" + + "\tsum(l_extendedprice * (1 - l_discount)) as sum_disc_price,\n" + + "\tsum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,\n" + + "\tavg(l_quantity) as avg_qty,\n" + + "\tavg(l_extendedprice) as avg_price,\n" + + "\tavg(l_discount) as avg_disc,\n" + + "\tcount(*) as count_order\n" + + "from\n" + + "\tlineitem\n" + + "where\n" + + "\tl_shipdate <= date '1998-12-01' - interval '90' day (3)\n" + + "group by\n" + + "\tl_returnflag,\n" + + "\tl_linestatus\n" + + "order by\n" + + "\tl_returnflag,\n" + + "\tl_linestatus limit 10"; + + public static final String QUERY3 = + "select\n" + + "\tl_orderkey,\n" + + "\tsum(l_extendedprice * (1 - l_discount)) as revenue,\n" + + "\to_orderdate,\n" + + "\to_shippriority\n" + + "from\n" + + "\tcustomer,\n" + + "\torders,\n" + + "\tlineitem\n" + + "where\n" + + "\tc_mktsegment = 'BUILDING'\n" + + "\tand c_custkey = o_custkey\n" + + "\tand l_orderkey = o_orderkey\n" + + "\tand o_orderdate < date '1995-03-15'\n" + + "\tand l_shipdate > date '1995-03-15'\n" + + "group by\n" + + "\tl_orderkey,\n" + + "\to_orderdate,\n" + + "\to_shippriority\n" + + "order by\n" + + "\trevenue desc,\n" + + "\to_orderdate\n" + + "limit 10"; + + public static final String QUERY4 = + "select\n" + + "\to_orderpriority,\n" + + "\tcount(*) as order_count\n" + + "from\n" + + "\torders\n" + + "where\n" + + "\to_orderdate >= date '1993-07-01'\n" + + "\tand o_orderdate < date '1993-07-01' + interval '3' month\n" + + "\tand exists (\n" + + "\t\tselect\n" + + "\t\t\t*\n" + + "\t\tfrom\n" + + "\t\t\tlineitem\n" + + "\t\twhere\n" + + "\t\t\tl_orderkey = o_orderkey\n" + + "\t\t\tand l_commitdate < l_receiptdate\n" + + "\t)\n" + + "group by\n" + + "\to_orderpriority\n" + + "order by\n" + + "\to_orderpriority limit 10"; + + public static final String QUERY5 = + "select\n" + + "\tn_name,\n" + + "\tsum(l_extendedprice * (1 - l_discount)) as revenue\n" + + "from\n" + + "\tcustomer,\n" + + "\torders,\n" + + "\tlineitem,\n" + + "\tsupplier,\n" + + "\tnation,\n" + + "\tregion\n" + + "where\n" + + "\tc_custkey = o_custkey\n" + + "\tand l_orderkey = o_orderkey\n" + + "\tand l_suppkey = s_suppkey\n" + + "\tand c_nationkey = s_nationkey\n" + + "\tand s_nationkey = n_nationkey\n" + + "\tand n_regionkey = r_regionkey\n" + + "\tand r_name = 'ASIA'\n" + + "\tand o_orderdate >= date '1994-01-01'\n" + + "\tand o_orderdate < date '1994-01-01' + interval '1' year\n" + + "group by\n" + + "\tn_name\n" + + "order by\n" + + "\trevenue desc limit 10"; + + public static final String QUERY6 = + "select\n" + + "\tsum(l_extendedprice * l_discount) as revenue\n" + + "from\n" + + "\tlineitem\n" + + "where\n" + + "\tl_shipdate >= date '1994-01-01'\n" + + "\tand l_shipdate < date '1994-01-01' + interval '1' year\n" + + "\tand l_discount between .06 - 0.01 and .06 + 0.01\n" + + "\tand l_quantity < 24 limit 10"; + + + /** + * Private Constructor. + */ + private Tpch() { + } + + static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>> implements Serializable { + + private CSVFormat csvFormat; + + public RowToCsv(CSVFormat csvFormat) { + this.csvFormat = csvFormat; + } + + public CSVFormat getCsvFormat() { + return csvFormat; + } + + @Override + public PCollection<String> expand(PCollection<Row> input) { + return input.apply( + "rowToCsv", + MapElements.into(TypeDescriptors.strings()).via(row -> beamRow2CsvLine(row, csvFormat))); + } + } + + private static PCollectionTuple getHTables(final Pipeline pipeline, + final CSVFormat csvFormat, + final String inputDirectory) { + final ImmutableMap<String, Schema> hSchemas = ImmutableMap.<String, Schema>builder() + //.put("customer", Schemas.CUSTOMER_SCHEMA) + .put("lineitem", Schemas.LINEITEM_SCHEMA) + //.put("nation", Schemas.NATION_SCHEMA) + //.put("orders", Schemas.ORDER_SCHEMA) + //.put("part", Schemas.PART_SCHEMA) + //.put("partsupp", Schemas.PARTSUPP_SCHEMA) + //.put("region", Schemas.REGION_SCHEMA) + //.put("supplier", Schemas.SUPPLIER_SCHEMA) + // .put("store_sales", Schemas.storeSalesSchema) + // .put("catalog_sales", Schemas.catalogSalesSchema) + // .put("item", Schemas.itemSchema) + // .put("date_dim", Schemas.dateDimSchema) + // .put("promotion", Schemas.promotionSchema) + // .put("customer_demographics", Schemas.customerDemographicsSchema) + // .put("web_sales", Schemas.webSalesSchema) + // .put("inventory", Schemas.inventorySchema) + .build(); + + PCollectionTuple tables = PCollectionTuple.empty(pipeline); + for (final Map.Entry<String, Schema> tableSchema : hSchemas.entrySet()) { + final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl"; + final PCollection<Row> table = new TextTable( + tableSchema.getValue(), + filePattern, + new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat), + new RowToCsv(csvFormat)) + .buildIOReader(pipeline.begin()) + .setCoder(tableSchema.getValue().getRowCoder()) + .setName(tableSchema.getKey()); + tables = tables.and(new TupleTag<>(tableSchema.getKey()), table); + + LOG.info("FilePattern {} / Tables {}", filePattern, tables); + } + return tables; + } + + + /** + * @param args arguments. + */ + public static void main(final String[] args) { + final int queryId = Integer.valueOf(args[0]); + final String inputDirectory = args[1]; + final String outputFilePath = args[2]; + + final Map<Integer, String> idToQuery = new HashMap<>(); + idToQuery.put(1, QUERY1); + idToQuery.put(3, QUERY3); + idToQuery.put(4, QUERY4); + idToQuery.put(5, QUERY5); + idToQuery.put(6, QUERY6); + + LOG.info("{} / {}", queryId, inputDirectory, outputFilePath); + + final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class); + options.setRunner(NemoPipelineRunner.class); + options.setJobName("SimpleSQL"); + final Pipeline p = Pipeline.create(options); + + // Create tables + final CSVFormat csvFormat = CSVFormat.MYSQL.withDelimiter('|').withNullString(""); + final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory); + + // Run the TPC-H query + final PCollection<Row> result = tables.apply(SqlTransform.query(idToQuery.get(queryId))); + + /* + final PCollection<String> resultToWrite = result.apply(MapElements.into(TypeDescriptors.strings()).via( + new SerializableFunction<Row, String>() { + @Override + public String apply(Row input) { + System.out.println("row: " + input.getValues()); + return "row: " + input.getValues(); + } + })); + + GenericSourceSink.write(resultToWrite, outputFilePath); + */ + + // Then run + p.run(); + } +}
