This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch tpch-fix
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 0092a5357c7b9326301a18d386b109d805be5b4b
Author: John Yang <[email protected]>
AuthorDate: Mon Sep 10 10:01:20 2018 +0900

    all queries run
---
 .../compiler/frontend/beam/PipelineTranslator.java |  1 +
 .../org/apache/nemo/examples/beam/tpch/Tpch.java   | 16 +++++-----
 .../apache/nemo/examples/beam/SQLTpchITCase.java   | 36 +++++++++++++++++++++-
 3 files changed, 45 insertions(+), 8 deletions(-)

diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 67b8828..5d77296 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -237,6 +237,7 @@ public final class PipelineTranslator
     final boolean handlesBeamRow = Stream
       .concat(transformVertex.getNode().getInputs().values().stream(),
         transformVertex.getNode().getOutputs().values().stream())
+      .filter(pValue -> getCoder(pValue, ctx.pipeline) instanceof KvCoder)
       .map(pValue -> (KvCoder) getCoder(pValue, ctx.pipeline)) // Input and 
output of combine should be KV
       .map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // 
We're interested in the 'Value' of KV
       .anyMatch(valueTypeDescriptor -> 
TypeDescriptor.of(Row.class).equals(valueTypeDescriptor));
diff --git 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
index ad93bd9..3ce3072 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
@@ -191,14 +191,16 @@ public final class Tpch {
                                              final String inputDirectory) {
     final ImmutableMap<String, Schema> hSchemas = ImmutableMap.<String, 
Schema>builder()
       .put("lineitem", Schemas.LINEITEM_SCHEMA)
-      /*
       .put("customer", Schemas.CUSTOMER_SCHEMA)
-      .put("nation", Schemas.NATION_SCHEMA)
       .put("orders", Schemas.ORDER_SCHEMA)
+
+      .put("supplier", Schemas.SUPPLIER_SCHEMA)
+      .put("nation", Schemas.NATION_SCHEMA)
+      .put("region", Schemas.REGION_SCHEMA)
+
+      /*
       .put("part", Schemas.PART_SCHEMA)
       .put("partsupp", Schemas.PARTSUPP_SCHEMA)
-      .put("region", Schemas.REGION_SCHEMA)
-      .put("supplier", Schemas.SUPPLIER_SCHEMA)
       .put("store_sales", Schemas.STORE_SALES_SCHEMA)
       .put("catalog_sales", Schemas.CATALOG_SALES_SCHEMA)
       .put("item", Schemas.ITEM_SCHEMA)
@@ -248,7 +250,7 @@ public final class Tpch {
 
     final PipelineOptions options = 
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
     options.setRunner(NemoPipelineRunner.class);
-    options.setJobName("SimpleSQL");
+    options.setJobName("TPC-H");
     final Pipeline p = Pipeline.create(options);
 
     // Create tables
@@ -265,8 +267,8 @@ public final class Tpch {
       new SerializableFunction<Row, String>() {
         @Override
         public String apply(final Row input) {
-          System.out.println("row: " + input.getValues());
-          return "row: " + input.getValues();
+          System.out.println(input.getValues().toString());
+          return input.getValues().toString();
         }
       }));
 
diff --git 
a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java 
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
index 427d053..87df9ae 100644
--- 
a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
+++ 
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java
@@ -58,10 +58,44 @@ public final class SQLTpchITCase {
   }
 
   @Test (timeout = TIMEOUT)
+  public void testThree() throws Exception {
+    final int queryNum = 3;
+    JobLauncher.main(builder
+      .addUserMain(Tpch.class.getCanonicalName())
+      .addUserArgs(String.valueOf(queryNum), 
"/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath)
+      .addJobId(SQLTpchITCase.class.getSimpleName())
+      
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+      .build());
+  }
+
+  @Test (timeout = TIMEOUT)
+  public void testFour() throws Exception {
+    final int queryNum = 4;
+    JobLauncher.main(builder
+      .addUserMain(Tpch.class.getCanonicalName())
+      .addUserArgs(String.valueOf(queryNum), 
"/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath)
+      .addJobId(SQLTpchITCase.class.getSimpleName())
+      
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+      .build());
+  }
+
+  @Test (timeout = TIMEOUT)
+  public void testFive() throws Exception {
+    final int queryNum = 5;
+    JobLauncher.main(builder
+      .addUserMain(Tpch.class.getCanonicalName())
+      .addUserArgs(String.valueOf(queryNum), 
"/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath)
+      .addJobId(SQLTpchITCase.class.getSimpleName())
+      
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+      .build());
+  }
+
+  @Test (timeout = TIMEOUT)
   public void testSix() throws Exception {
+    final int queryNum = 6;
     JobLauncher.main(builder
       .addUserMain(Tpch.class.getCanonicalName())
-      .addUserArgs("6", "/home/johnyangk/Desktop/tpc-concat-tbls/", 
outputFilePath)
+      .addUserArgs(String.valueOf(queryNum), 
"/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath)
       .addJobId(SQLTpchITCase.class.getSimpleName())
       
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
       .build());

Reply via email to