This is an automated email from the ASF dual-hosted git repository. johnyangk pushed a commit to branch tpch-fix in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 0092a5357c7b9326301a18d386b109d805be5b4b Author: John Yang <[email protected]> AuthorDate: Mon Sep 10 10:01:20 2018 +0900 all queries run --- .../compiler/frontend/beam/PipelineTranslator.java | 1 + .../org/apache/nemo/examples/beam/tpch/Tpch.java | 16 +++++----- .../apache/nemo/examples/beam/SQLTpchITCase.java | 36 +++++++++++++++++++++- 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java index 67b8828..5d77296 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java @@ -237,6 +237,7 @@ public final class PipelineTranslator final boolean handlesBeamRow = Stream .concat(transformVertex.getNode().getInputs().values().stream(), transformVertex.getNode().getOutputs().values().stream()) + .filter(pValue -> getCoder(pValue, ctx.pipeline) instanceof KvCoder) .map(pValue -> (KvCoder) getCoder(pValue, ctx.pipeline)) // Input and output of combine should be KV .map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // We're interested in the 'Value' of KV .anyMatch(valueTypeDescriptor -> TypeDescriptor.of(Row.class).equals(valueTypeDescriptor)); diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java index ad93bd9..3ce3072 100644 --- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java @@ -191,14 +191,16 @@ public final class Tpch { final String inputDirectory) { final ImmutableMap<String, Schema> hSchemas = ImmutableMap.<String, Schema>builder() .put("lineitem", Schemas.LINEITEM_SCHEMA) - /* .put("customer", Schemas.CUSTOMER_SCHEMA) - .put("nation", Schemas.NATION_SCHEMA) .put("orders", Schemas.ORDER_SCHEMA) + + .put("supplier", Schemas.SUPPLIER_SCHEMA) + .put("nation", Schemas.NATION_SCHEMA) + .put("region", Schemas.REGION_SCHEMA) + + /* .put("part", Schemas.PART_SCHEMA) .put("partsupp", Schemas.PARTSUPP_SCHEMA) - .put("region", Schemas.REGION_SCHEMA) - .put("supplier", Schemas.SUPPLIER_SCHEMA) .put("store_sales", Schemas.STORE_SALES_SCHEMA) .put("catalog_sales", Schemas.CATALOG_SALES_SCHEMA) .put("item", Schemas.ITEM_SCHEMA) @@ -248,7 +250,7 @@ public final class Tpch { final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class); options.setRunner(NemoPipelineRunner.class); - options.setJobName("SimpleSQL"); + options.setJobName("TPC-H"); final Pipeline p = Pipeline.create(options); // Create tables @@ -265,8 +267,8 @@ public final class Tpch { new SerializableFunction<Row, String>() { @Override public String apply(final Row input) { - System.out.println("row: " + input.getValues()); - return "row: " + input.getValues(); + System.out.println(input.getValues().toString()); + return input.getValues().toString(); } })); diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java index 427d053..87df9ae 100644 --- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java +++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java @@ -58,10 +58,44 @@ public final class SQLTpchITCase { } @Test (timeout = TIMEOUT) + public void testThree() throws Exception { + final int queryNum = 3; + JobLauncher.main(builder + .addUserMain(Tpch.class.getCanonicalName()) + .addUserArgs(String.valueOf(queryNum), "/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath) + .addJobId(SQLTpchITCase.class.getSimpleName()) + .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) + .build()); + } + + @Test (timeout = TIMEOUT) + public void testFour() throws Exception { + final int queryNum = 4; + JobLauncher.main(builder + .addUserMain(Tpch.class.getCanonicalName()) + .addUserArgs(String.valueOf(queryNum), "/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath) + .addJobId(SQLTpchITCase.class.getSimpleName()) + .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) + .build()); + } + + @Test (timeout = TIMEOUT) + public void testFive() throws Exception { + final int queryNum = 5; + JobLauncher.main(builder + .addUserMain(Tpch.class.getCanonicalName()) + .addUserArgs(String.valueOf(queryNum), "/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath) + .addJobId(SQLTpchITCase.class.getSimpleName()) + .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) + .build()); + } + + @Test (timeout = TIMEOUT) public void testSix() throws Exception { + final int queryNum = 6; JobLauncher.main(builder .addUserMain(Tpch.class.getCanonicalName()) - .addUserArgs("6", "/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath) + .addUserArgs(String.valueOf(queryNum), "/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath) .addJobId(SQLTpchITCase.class.getSimpleName()) .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) .build());
