This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch tpch-fix
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 4072654d07390410ead7d77a138e480c98032dfd
Author: John Yang <[email protected]>
AuthorDate: Wed Sep 12 10:48:21 2018 +0900

    generic
---
 .../src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java | 10 +++-------
 1 file changed, 3 insertions(+), 7 deletions(-)

diff --git 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
index 3ce3072..42db3da 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java
@@ -214,13 +214,9 @@ public final class Tpch {
 
     PCollectionTuple tables = PCollectionTuple.empty(pipeline);
     for (final Map.Entry<String, Schema> tableSchema : hSchemas.entrySet()) {
-      final String filePattern = inputDirectory + tableSchema.getKey() + 
".tbl";
-      final PCollection<Row> table = new TextTable(
-        tableSchema.getValue(),
-        filePattern,
-        new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat),
-        new RowToCsv(csvFormat))
-        .buildIOReader(pipeline.begin())
+      final String filePattern = inputDirectory + tableSchema.getKey() + 
".tbl*";
+      final PCollection<Row> table = GenericSourceSink.read(pipeline, 
filePattern)
+        .apply("StringToRow", new 
TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat))
         .setCoder(tableSchema.getValue().getRowCoder())
         .setName(tableSchema.getKey());
       tables = tables.and(new TupleTag<>(tableSchema.getKey()), table);

Reply via email to