This is an automated email from the ASF dual-hosted git repository. johnyangk pushed a commit to branch tpch-fix in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 413ba2906212e4bc7691ae84f07a7fb6034b7d90 Author: John Yang <[email protected]> AuthorDate: Sun Sep 9 11:22:05 2018 +0900 tpch setup --- .../apache/nemo/examples/beam/tpch/Schemas.java | 372 +++++++++++---------- .../org/apache/nemo/examples/beam/tpch/Tpch.java | 43 +-- ...leSumSQLITCase.java => SQLSimpleSumITCase.java} | 4 +- ...mSimpleSumSQLITCase.java => SQLTpchITCase.java} | 20 +- 4 files changed, 228 insertions(+), 211 deletions(-) diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java index b464789..a778abe 100644 --- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Schemas.java @@ -21,14 +21,27 @@ import org.apache.beam.sdk.schemas.Schema; /** * A simple SQL application. - * (Copied/Refined from the example code in the Beam repository) + * (Copied and adapted from https://github.com/apache/beam/pull/6240) */ public final class Schemas { - // private static final JavaTypeFactory TYPE_FACTORY = - // new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); - // private final ImmutableMap<String, TpcdsTable> tableHMap; - private final ImmutableMap<String, String> columnPrefixes; - public static Schema storeSalesSchema = + /** + * Private. + */ + private Schemas() { + } + + public static final ImmutableMap<String, String> COLUMN_PREFIX = ImmutableMap.<String, String>builder() + .put("lineitem", "l_") + .put("customer", "c_") + .put("supplier", "s_") + .put("partsupp", "ps_") + .put("part", "p_") + .put("orders", "o_") + .put("nation", "n_") + .put("region", "r_") + .build(); + + public static final Schema STORE_SALES_SCHEMA = Schema.builder() .addNullableField("ss_sold_date_sk", Schema.FieldType.INT32) .addNullableField("ss_sold_time_sk", Schema.FieldType.INT32) @@ -54,7 +67,8 @@ public final class Schemas { .addNullableField("ss_net_paid_inc_tax", Schema.FieldType.FLOAT) .addNullableField("ss_net_profit", Schema.FieldType.FLOAT) .build(); - public static Schema dateDimSchema = + + public static final Schema DATE_DIM_SCHEMA = Schema.builder() .addNullableField("d_date_sk", Schema.FieldType.INT32) .addNullableField("d_date_id", Schema.FieldType.STRING) @@ -85,76 +99,79 @@ public final class Schemas { .addNullableField("d_current_quarter", Schema.FieldType.STRING) .addNullableField("d_current_year", Schema.FieldType.STRING) .build(); - public static Schema itemSchema = + + public static final Schema ITEM_SCHEMA = Schema.builder() .addNullableField("i_item_sk", Schema.FieldType.INT32) - .addNullableField("i_item_id", Schema.FieldType.STRING) // .string, - .addNullableField("i_rec_start_date", Schema.FieldType.DATETIME) // .date, - .addNullableField("i_rec_end_date", Schema.FieldType.DATETIME) // .date, - .addNullableField("i_item_desc", Schema.FieldType.STRING) // .string, - .addNullableField("i_current_price", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField( - "i_wholesale_cost", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("i_brand_id", Schema.FieldType.INT32) // .int, - .addNullableField("i_brand", Schema.FieldType.STRING) // .string, - .addNullableField("i_class_id", Schema.FieldType.INT32) // .int, - .addNullableField("i_class", Schema.FieldType.STRING) // .string, - .addNullableField("i_category_id", Schema.FieldType.INT32) // .int, - .addNullableField("i_category", Schema.FieldType.STRING) // .string, - .addNullableField("i_manufact_id", Schema.FieldType.INT32) // .int, - .addNullableField("i_manufact", Schema.FieldType.STRING) // .string, - .addNullableField("i_size", Schema.FieldType.STRING) // .string, - .addNullableField("i_formulation", Schema.FieldType.STRING) // .string, - .addNullableField("i_color", Schema.FieldType.STRING) // .string, - .addNullableField("i_units", Schema.FieldType.STRING) // .string, - .addNullableField("i_container", Schema.FieldType.STRING) // .string, - .addNullableField("i_manager_id", Schema.FieldType.INT32) // .int, - .addNullableField("i_product_name", Schema.FieldType.STRING) // .string), + .addNullableField("i_item_id", Schema.FieldType.STRING) + .addNullableField("i_rec_start_date", Schema.FieldType.DATETIME) + .addNullableField("i_rec_end_date", Schema.FieldType.DATETIME) + .addNullableField("i_item_desc", Schema.FieldType.STRING) + .addNullableField("i_current_price", Schema.FieldType.FLOAT) + .addNullableField("i_wholesale_cost", Schema.FieldType.FLOAT) + .addNullableField("i_brand_id", Schema.FieldType.INT32) + .addNullableField("i_brand", Schema.FieldType.STRING) + .addNullableField("i_class_id", Schema.FieldType.INT32) + .addNullableField("i_class", Schema.FieldType.STRING) + .addNullableField("i_category_id", Schema.FieldType.INT32) + .addNullableField("i_category", Schema.FieldType.STRING) + .addNullableField("i_manufact_id", Schema.FieldType.INT32) + .addNullableField("i_manufact", Schema.FieldType.STRING) + .addNullableField("i_size", Schema.FieldType.STRING) + .addNullableField("i_formulation", Schema.FieldType.STRING) + .addNullableField("i_color", Schema.FieldType.STRING) + .addNullableField("i_units", Schema.FieldType.STRING) + .addNullableField("i_container", Schema.FieldType.STRING) + .addNullableField("i_manager_id", Schema.FieldType.INT32) + .addNullableField("i_product_name", Schema.FieldType.STRING) .build(); - public static Schema inventorySchema = + + public static final Schema INVENTORY_SCHEMA = Schema.builder() .addNullableField("inv_date_sk", Schema.FieldType.INT32) .addNullableField("inv_item_sk", Schema.FieldType.INT32) .addNullableField("inv_warehouse_sk", Schema.FieldType.INT32) .addNullableField("inv_quantity_on_hand", Schema.FieldType.INT32) .build(); - public static Schema catalogSalesSchema = + + public static final Schema CATALOG_SALES_SCHEMA = Schema.builder() - .addNullableField("cs_sold_date_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_sold_time_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_ship_date_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_bill_customer_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_bill_cdemo_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_bill_hdemo_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_bill_addr_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_ship_customer_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_ship_cdemo_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_ship_hdemo_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_ship_addr_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_call_center_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_catalog_page_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_ship_mode_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_warehouse_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_item_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_promo_sk", Schema.FieldType.INT32) // .int, - .addNullableField("cs_order_number", Schema.FieldType.INT64) // .long, - .addNullableField("cs_quantity", Schema.FieldType.INT32) // .int, - .addNullableField("cs_wholesale_cost", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_list_price", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_sales_price", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_ext_discount_amt", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_ext_sales_price", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_ext_wholesale_cost", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_ext_list_price", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_ext_tax", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_coupon_amt", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_ext_ship_cost", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_net_paid", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_net_paid_inc_tax", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_net_paid_inc_ship", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_net_paid_inc_ship_tax", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("cs_net_profit", Schema.FieldType.FLOAT) // .decimal(7,2)) + .addNullableField("cs_sold_date_sk", Schema.FieldType.INT32) + .addNullableField("cs_sold_time_sk", Schema.FieldType.INT32) + .addNullableField("cs_ship_date_sk", Schema.FieldType.INT32) + .addNullableField("cs_bill_customer_sk", Schema.FieldType.INT32) + .addNullableField("cs_bill_cdemo_sk", Schema.FieldType.INT32) + .addNullableField("cs_bill_hdemo_sk", Schema.FieldType.INT32) + .addNullableField("cs_bill_addr_sk", Schema.FieldType.INT32) + .addNullableField("cs_ship_customer_sk", Schema.FieldType.INT32) + .addNullableField("cs_ship_cdemo_sk", Schema.FieldType.INT32) + .addNullableField("cs_ship_hdemo_sk", Schema.FieldType.INT32) + .addNullableField("cs_ship_addr_sk", Schema.FieldType.INT32) + .addNullableField("cs_call_center_sk", Schema.FieldType.INT32) + .addNullableField("cs_catalog_page_sk", Schema.FieldType.INT32) + .addNullableField("cs_ship_mode_sk", Schema.FieldType.INT32) + .addNullableField("cs_warehouse_sk", Schema.FieldType.INT32) + .addNullableField("cs_item_sk", Schema.FieldType.INT32) + .addNullableField("cs_promo_sk", Schema.FieldType.INT32) + .addNullableField("cs_order_number", Schema.FieldType.INT64) + .addNullableField("cs_quantity", Schema.FieldType.INT32) + .addNullableField("cs_wholesale_cost", Schema.FieldType.FLOAT) + .addNullableField("cs_list_price", Schema.FieldType.FLOAT) + .addNullableField("cs_sales_price", Schema.FieldType.FLOAT) + .addNullableField("cs_ext_discount_amt", Schema.FieldType.FLOAT) + .addNullableField("cs_ext_sales_price", Schema.FieldType.FLOAT) + .addNullableField("cs_ext_wholesale_cost", Schema.FieldType.FLOAT) + .addNullableField("cs_ext_list_price", Schema.FieldType.FLOAT) + .addNullableField("cs_ext_tax", Schema.FieldType.FLOAT) + .addNullableField("cs_coupon_amt", Schema.FieldType.FLOAT) + .addNullableField("cs_ext_ship_cost", Schema.FieldType.FLOAT) + .addNullableField("cs_net_paid", Schema.FieldType.FLOAT) + .addNullableField("cs_net_paid_inc_tax", Schema.FieldType.FLOAT) + .addNullableField("cs_net_paid_inc_ship", Schema.FieldType.FLOAT) + .addNullableField("cs_net_paid_inc_ship_tax", Schema.FieldType.FLOAT) + .addNullableField("cs_net_profit", Schema.FieldType.FLOAT) .build(); + public static final Schema ORDER_SCHEMA = Schema.builder() .addInt32Field("o_orderkey") @@ -167,6 +184,7 @@ public final class Schemas { .addInt32Field("o_shippriority") .addStringField("o_comment") .build(); + public static final Schema CUSTOMER_SCHEMA = Schema.builder() .addInt32Field("c_custkey") @@ -178,27 +196,29 @@ public final class Schemas { .addStringField("c_mktsegment") .addStringField("c_comment") .build(); - public static Schema getCustomerDsSchema = + + public static final Schema CUSTOMER_DS_SCHEMA = Schema.builder() - .addNullableField("c_customer_sk", Schema.FieldType.INT32) // .int, - .addNullableField("c_customer_id", Schema.FieldType.STRING) // .string, - .addNullableField("c_current_cdemo_sk", Schema.FieldType.INT32) // .int, - .addNullableField("c_current_hdemo_sk", Schema.FieldType.INT32) // .int, - .addNullableField("c_current_addr_sk", Schema.FieldType.INT32) // .int, - .addNullableField("c_first_shipto_date_sk", Schema.FieldType.INT32) // .int, - .addNullableField("c_first_sales_date_sk", Schema.FieldType.INT32) // .int, - .addNullableField("c_salutation", Schema.FieldType.STRING) // .string, - .addNullableField("c_first_name", Schema.FieldType.STRING) // .string, - .addNullableField("c_last_name", Schema.FieldType.STRING) // .string, - .addNullableField("c_preferred_cust_flag", Schema.FieldType.STRING) // .string, - .addNullableField("c_birth_day", Schema.FieldType.INT32) // .int, - .addNullableField("c_birth_month", Schema.FieldType.INT32) // .int, - .addNullableField("c_birth_year", Schema.FieldType.INT32) // .int, - .addNullableField("c_birth_country", Schema.FieldType.STRING) // .string, - .addNullableField("c_login", Schema.FieldType.STRING) // .string, - .addNullableField("c_email_address", Schema.FieldType.STRING) // .string, - .addNullableField("c_last_review_date", Schema.FieldType.STRING) // .string) + .addNullableField("c_customer_sk", Schema.FieldType.INT32) + .addNullableField("c_customer_id", Schema.FieldType.STRING) + .addNullableField("c_current_cdemo_sk", Schema.FieldType.INT32) + .addNullableField("c_current_hdemo_sk", Schema.FieldType.INT32) + .addNullableField("c_current_addr_sk", Schema.FieldType.INT32) + .addNullableField("c_first_shipto_date_sk", Schema.FieldType.INT32) + .addNullableField("c_first_sales_date_sk", Schema.FieldType.INT32) + .addNullableField("c_salutation", Schema.FieldType.STRING) + .addNullableField("c_first_name", Schema.FieldType.STRING) + .addNullableField("c_last_name", Schema.FieldType.STRING) + .addNullableField("c_preferred_cust_flag", Schema.FieldType.STRING) + .addNullableField("c_birth_day", Schema.FieldType.INT32) + .addNullableField("c_birth_month", Schema.FieldType.INT32) + .addNullableField("c_birth_year", Schema.FieldType.INT32) + .addNullableField("c_birth_country", Schema.FieldType.STRING) + .addNullableField("c_login", Schema.FieldType.STRING) + .addNullableField("c_email_address", Schema.FieldType.STRING) + .addNullableField("c_last_review_date", Schema.FieldType.STRING) .build(); + public static final Schema LINEITEM_SCHEMA = Schema.builder() .addInt32Field("l_orderkey") @@ -218,30 +238,34 @@ public final class Schemas { .addStringField("l_shipmode") .addStringField("l_comment") .build(); + public static final Schema PARTSUPP_SCHEMA = Schema.builder() - .addInt32Field("ps_partkey") // identifier - .addInt32Field("ps_suppkey") // identifier - .addInt32Field("ps_availqty") // integer - .addFloatField("ps_supplycost") // decimal - .addStringField("ps_comment") // variable text, size 199 + .addInt32Field("ps_partkey") + .addInt32Field("ps_suppkey") + .addInt32Field("ps_availqty") + .addFloatField("ps_supplycost") + .addStringField("ps_comment") .build(); + public static final Schema REGION_SCHEMA = Schema.builder() - .addInt32Field("r_regionkey") // identifier - .addStringField("r_name") // fixed text, size 25 - .addStringField("r_comment") // variable text, size 152 + .addInt32Field("r_regionkey") + .addStringField("r_name") + .addStringField("r_comment") .build(); + public static final Schema SUPPLIER_SCHEMA = Schema.builder() - .addInt32Field("s_suppkey") // identifier - .addStringField("s_name") // fixed text, size 25 - .addStringField("s_address") // variable text, size 40 - .addInt32Field("s_nationkey") // identifier - .addStringField("s_phone") // fixed text, size 15 - .addFloatField("s_acctbal") // decimal - .addStringField("s_comment") // variable text, size 101 + .addInt32Field("s_suppkey") + .addStringField("s_name") + .addStringField("s_address") + .addInt32Field("s_nationkey") + .addStringField("s_phone") + .addFloatField("s_acctbal") + .addStringField("s_comment") .build(); + public static final Schema PART_SCHEMA = Schema.builder() .addInt32Field("p_partkey") @@ -254,6 +278,7 @@ public final class Schemas { .addFloatField("p_retailprice") .addStringField("p_comment") .build(); + public static final Schema NATION_SCHEMA = Schema.builder() .addInt32Field("n_nationkey") @@ -261,93 +286,78 @@ public final class Schemas { .addInt32Field("n_regionkey") .addStringField("n_comment") .build(); - public static Schema promotionSchema = + + public static final Schema PROMOTION_SCHEMA = Schema.builder() .addNullableField("p_promo_sk", Schema.FieldType.INT32) - .addNullableField("p_promo_id", Schema.FieldType.STRING) // .string, - .addNullableField("p_start_date_sk", Schema.FieldType.INT32) // .int, - .addNullableField("p_end_date_sk", Schema.FieldType.INT32) // .int, - .addNullableField("p_item_sk", Schema.FieldType.INT32) // .int, - .addNullableField("p_cost", Schema.FieldType.FLOAT) // .decimal(15,2), - .addNullableField("p_response_target", Schema.FieldType.INT32) // .int, - .addNullableField("p_promo_name", Schema.FieldType.STRING) // .string, - .addNullableField("p_channel_dmail", Schema.FieldType.STRING) // .string, - .addNullableField("p_channel_email", Schema.FieldType.STRING) // .string, - .addNullableField("p_channel_catalog", Schema.FieldType.STRING) // .string, - .addNullableField("p_channel_tv", Schema.FieldType.STRING) // .string, - .addNullableField("p_channel_radio", Schema.FieldType.STRING) // .string, - .addNullableField("p_channel_press", Schema.FieldType.STRING) // .string, - .addNullableField("p_channel_event", Schema.FieldType.STRING) // .string, - .addNullableField("p_channel_demo", Schema.FieldType.STRING) // .string, - .addNullableField("p_channel_details", Schema.FieldType.STRING) // .string, - .addNullableField("p_purpose", Schema.FieldType.STRING) // .string, - .addNullableField("p_discount_active", Schema.FieldType.STRING) // .string), + .addNullableField("p_promo_id", Schema.FieldType.STRING) + .addNullableField("p_start_date_sk", Schema.FieldType.INT32) + .addNullableField("p_end_date_sk", Schema.FieldType.INT32) + .addNullableField("p_item_sk", Schema.FieldType.INT32) + .addNullableField("p_cost", Schema.FieldType.FLOAT) + .addNullableField("p_response_target", Schema.FieldType.INT32) + .addNullableField("p_promo_name", Schema.FieldType.STRING) + .addNullableField("p_channel_dmail", Schema.FieldType.STRING) + .addNullableField("p_channel_email", Schema.FieldType.STRING) + .addNullableField("p_channel_catalog", Schema.FieldType.STRING) + .addNullableField("p_channel_tv", Schema.FieldType.STRING) + .addNullableField("p_channel_radio", Schema.FieldType.STRING) + .addNullableField("p_channel_press", Schema.FieldType.STRING) + .addNullableField("p_channel_event", Schema.FieldType.STRING) + .addNullableField("p_channel_demo", Schema.FieldType.STRING) + .addNullableField("p_channel_details", Schema.FieldType.STRING) + .addNullableField("p_purpose", Schema.FieldType.STRING) + .addNullableField("p_discount_active", Schema.FieldType.STRING) .build(); - public static Schema customerDemographicsSchema = + + public static final Schema CUSTOMER_DEMOGRAPHIC_SCHEMA = Schema.builder() .addNullableField("cd_demo_sk", Schema.FieldType.INT32) - .addNullableField("cd_gender", Schema.FieldType.STRING) // .string, - .addNullableField("cd_marital_status", Schema.FieldType.STRING) // .string, - .addNullableField("cd_education_status", Schema.FieldType.STRING) // .string, - .addNullableField("cd_purchase_estimate", Schema.FieldType.INT32) // .int, - .addNullableField("cd_credit_rating", Schema.FieldType.STRING) // .string, - .addNullableField("cd_dep_count", Schema.FieldType.INT32) // .int, - .addNullableField("cd_dep_employed_count", Schema.FieldType.INT32) // .int, - .addNullableField("cd_dep_college_count", Schema.FieldType.INT32) // .int), + .addNullableField("cd_gender", Schema.FieldType.STRING) + .addNullableField("cd_marital_status", Schema.FieldType.STRING) + .addNullableField("cd_education_status", Schema.FieldType.STRING) + .addNullableField("cd_purchase_estimate", Schema.FieldType.INT32) + .addNullableField("cd_credit_rating", Schema.FieldType.STRING) + .addNullableField("cd_dep_count", Schema.FieldType.INT32) + .addNullableField("cd_dep_employed_count", Schema.FieldType.INT32) + .addNullableField("cd_dep_college_count", Schema.FieldType.INT32) .build(); - public static Schema webSalesSchema = + + public static final Schema WEB_SALES_SCHEMA = Schema.builder() - .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_ship_date_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_item_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_bill_customer_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_bill_cdemo_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_bill_hdemo_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_bill_addr_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_ship_customer_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_ship_cdemo_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_ship_hdemo_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_ship_addr_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_web_page_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_web_site_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_ship_mode_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_warehouse_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_promo_sk", Schema.FieldType.INT32) // .int, - .addNullableField("ws_order_number", Schema.FieldType.INT64) // .long, - .addNullableField("ws_quantity", Schema.FieldType.INT32) // .int, - .addNullableField("ws_wholesale_cost", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_list_price", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_sales_price", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_ext_discount_amt", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_ext_sales_price", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_ext_wholesale_cost", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_ext_list_price", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_ext_tax", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_coupon_amt", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_ext_ship_cost", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_net_paid", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_net_paid_inc_tax", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_net_paid_inc_ship", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_net_paid_inc_ship_tax", Schema.FieldType.FLOAT) // .decimal(7,2), - .addNullableField("ws_net_profit", Schema.FieldType.FLOAT) // .decimal(7,2)), + .addNullableField("ws_sold_date_sk", Schema.FieldType.INT32) + .addNullableField("ws_sold_time_sk", Schema.FieldType.INT32) + .addNullableField("ws_ship_date_sk", Schema.FieldType.INT32) + .addNullableField("ws_item_sk", Schema.FieldType.INT32) + .addNullableField("ws_bill_customer_sk", Schema.FieldType.INT32) + .addNullableField("ws_bill_cdemo_sk", Schema.FieldType.INT32) + .addNullableField("ws_bill_hdemo_sk", Schema.FieldType.INT32) + .addNullableField("ws_bill_addr_sk", Schema.FieldType.INT32) + .addNullableField("ws_ship_customer_sk", Schema.FieldType.INT32) + .addNullableField("ws_ship_cdemo_sk", Schema.FieldType.INT32) + .addNullableField("ws_ship_hdemo_sk", Schema.FieldType.INT32) + .addNullableField("ws_ship_addr_sk", Schema.FieldType.INT32) + .addNullableField("ws_web_page_sk", Schema.FieldType.INT32) + .addNullableField("ws_web_site_sk", Schema.FieldType.INT32) + .addNullableField("ws_ship_mode_sk", Schema.FieldType.INT32) + .addNullableField("ws_warehouse_sk", Schema.FieldType.INT32) + .addNullableField("ws_promo_sk", Schema.FieldType.INT32) + .addNullableField("ws_order_number", Schema.FieldType.INT64) + .addNullableField("ws_quantity", Schema.FieldType.INT32) + .addNullableField("ws_wholesale_cost", Schema.FieldType.FLOAT) + .addNullableField("ws_list_price", Schema.FieldType.FLOAT) + .addNullableField("ws_sales_price", Schema.FieldType.FLOAT) + .addNullableField("ws_ext_discount_amt", Schema.FieldType.FLOAT) + .addNullableField("ws_ext_sales_price", Schema.FieldType.FLOAT) + .addNullableField("ws_ext_wholesale_cost", Schema.FieldType.FLOAT) + .addNullableField("ws_ext_list_price", Schema.FieldType.FLOAT) + .addNullableField("ws_ext_tax", Schema.FieldType.FLOAT) + .addNullableField("ws_coupon_amt", Schema.FieldType.FLOAT) + .addNullableField("ws_ext_ship_cost", Schema.FieldType.FLOAT) + .addNullableField("ws_net_paid", Schema.FieldType.FLOAT) + .addNullableField("ws_net_paid_inc_tax", Schema.FieldType.FLOAT) + .addNullableField("ws_net_paid_inc_ship", Schema.FieldType.FLOAT) + .addNullableField("ws_net_paid_inc_ship_tax", Schema.FieldType.FLOAT) + .addNullableField("ws_net_profit", Schema.FieldType.FLOAT) .build(); - public Schemas() { - // final ImmutableMap.Builder<String, TpcdsTable> builder = ImmutableMap.builder(); - // for (TpcdsTable<?> tpcdsTable : TpcdsTable.getTables()) { - // builder.put(tpcdsTable.getTableName(), tpcdsTable); - // } - // this.tableHMap = builder.build(); - this.columnPrefixes = - ImmutableMap.<String, String>builder() - .put("lineitem", "l_") - .put("customer", "c_") - .put("supplier", "s_") - .put("partsupp", "ps_") - .put("part", "p_") - .put("orders", "o_") - .put("nation", "n_") - .put("region", "r_") - .build(); - } } diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java index 4ebf0f3..4f45920 100644 --- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java @@ -41,7 +41,7 @@ import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beam /** * A simple SQL application. - * (Copied/Refined from the example code in the Beam repository) + * (Copied and adapted from https://github.com/apache/beam/pull/6240) */ public final class Tpch { private static final Logger LOG = LoggerFactory.getLogger(Tpch.class.getName()); @@ -161,11 +161,14 @@ public final class Tpch { private Tpch() { } + /** + * Row csv formats. + */ static class RowToCsv extends PTransform<PCollection<Row>, PCollection<String>> implements Serializable { - private CSVFormat csvFormat; + private final CSVFormat csvFormat; - public RowToCsv(CSVFormat csvFormat) { + RowToCsv(final CSVFormat csvFormat) { this.csvFormat = csvFormat; } @@ -174,7 +177,7 @@ public final class Tpch { } @Override - public PCollection<String> expand(PCollection<Row> input) { + public PCollection<String> expand(final PCollection<Row> input) { return input.apply( "rowToCsv", MapElements.into(TypeDescriptors.strings()).via(row -> beamRow2CsvLine(row, csvFormat))); @@ -185,22 +188,24 @@ public final class Tpch { final CSVFormat csvFormat, final String inputDirectory) { final ImmutableMap<String, Schema> hSchemas = ImmutableMap.<String, Schema>builder() - //.put("customer", Schemas.CUSTOMER_SCHEMA) .put("lineitem", Schemas.LINEITEM_SCHEMA) - //.put("nation", Schemas.NATION_SCHEMA) - //.put("orders", Schemas.ORDER_SCHEMA) - //.put("part", Schemas.PART_SCHEMA) - //.put("partsupp", Schemas.PARTSUPP_SCHEMA) - //.put("region", Schemas.REGION_SCHEMA) - //.put("supplier", Schemas.SUPPLIER_SCHEMA) - // .put("store_sales", Schemas.storeSalesSchema) - // .put("catalog_sales", Schemas.catalogSalesSchema) - // .put("item", Schemas.itemSchema) - // .put("date_dim", Schemas.dateDimSchema) - // .put("promotion", Schemas.promotionSchema) - // .put("customer_demographics", Schemas.customerDemographicsSchema) - // .put("web_sales", Schemas.webSalesSchema) - // .put("inventory", Schemas.inventorySchema) + /* + .put("customer", Schemas.CUSTOMER_SCHEMA) + .put("nation", Schemas.NATION_SCHEMA) + .put("orders", Schemas.ORDER_SCHEMA) + .put("part", Schemas.PART_SCHEMA) + .put("partsupp", Schemas.PARTSUPP_SCHEMA) + .put("region", Schemas.REGION_SCHEMA) + .put("supplier", Schemas.SUPPLIER_SCHEMA) + .put("store_sales", Schemas.STORE_SALES_SCHEMA) + .put("catalog_sales", Schemas.CATALOG_SALES_SCHEMA) + .put("item", Schemas.ITEM_SCHEMA) + .put("date_dim", Schemas.DATE_DIM_SCHEMA) + .put("promotion", Schemas.PROMOTION_SCHEMA) + .put("customer_demographics", Schemas.CUSTOMER_DEMOGRAPHIC_SCHEMA) + .put("web_sales", Schemas.WEB_SALES_SCHEMA) + .put("inventory", Schemas.INVENTORY_SCHEMA) + */ .build(); PCollectionTuple tables = PCollectionTuple.empty(pipeline); diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLSimpleSumITCase.java similarity index 95% copy from examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java copy to examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLSimpleSumITCase.java index 4d55ade..a61ed70 100644 --- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java +++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLSimpleSumITCase.java @@ -31,7 +31,7 @@ import org.powermock.modules.junit4.PowerMockRunner; */ @RunWith(PowerMockRunner.class) @PrepareForTest(JobLauncher.class) -public final class BeamSimpleSumSQLITCase { +public final class SQLSimpleSumITCase { private static final int TIMEOUT = 180000; private static ArgBuilder builder; private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/"; @@ -61,7 +61,7 @@ public final class BeamSimpleSumSQLITCase { @Test (timeout = TIMEOUT) public void test() throws Exception { JobLauncher.main(builder - .addJobId(BeamSimpleSumSQLITCase.class.getSimpleName()) + .addJobId(SQLSimpleSumITCase.class.getSimpleName()) .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) .build()); } diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java similarity index 81% rename from examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java rename to examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java index 4d55ade..0b7668e 100644 --- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/BeamSimpleSumSQLITCase.java +++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java @@ -17,8 +17,8 @@ package org.apache.nemo.examples.beam; import org.apache.nemo.client.JobLauncher; import org.apache.nemo.common.test.ArgBuilder; -import org.apache.nemo.common.test.ExampleTestUtil; import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive; +import org.apache.nemo.examples.beam.tpch.Tpch; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -27,11 +27,11 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; /** - * Test Broadcast program with JobLauncher. + * Test TPC-H program with JobLauncher. */ @RunWith(PowerMockRunner.class) @PrepareForTest(JobLauncher.class) -public final class BeamSimpleSumSQLITCase { +public final class SQLTpchITCase { private static final int TIMEOUT = 180000; private static ArgBuilder builder; private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/"; @@ -44,25 +44,27 @@ public final class BeamSimpleSumSQLITCase { @Before public void setUp() throws Exception { builder = new ArgBuilder() - .addUserMain(SimpleSumSQL.class.getCanonicalName()) - .addUserArgs(outputFilePath) .addResourceJson(executorResourceFileName); } @After public void tearDown() throws Exception { + /* try { ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedOutputFileName); } finally { ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName); } + */ } @Test (timeout = TIMEOUT) - public void test() throws Exception { + public void testSix() throws Exception { JobLauncher.main(builder - .addJobId(BeamSimpleSumSQLITCase.class.getSimpleName()) - .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) - .build()); + .addUserMain(Tpch.class.getCanonicalName()) + .addUserArgs("6", "/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath) + .addJobId(SQLTpchITCase.class.getSimpleName()) + .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) + .build()); } }
