This is an automated email from the ASF dual-hosted git repository. johnyangk pushed a commit to branch tpch-fix in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 4072654d07390410ead7d77a138e480c98032dfd Author: John Yang <[email protected]> AuthorDate: Wed Sep 12 10:48:21 2018 +0900 generic --- .../src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java index 3ce3072..42db3da 100644 --- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java @@ -214,13 +214,9 @@ public final class Tpch { PCollectionTuple tables = PCollectionTuple.empty(pipeline); for (final Map.Entry<String, Schema> tableSchema : hSchemas.entrySet()) { - final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl"; - final PCollection<Row> table = new TextTable( - tableSchema.getValue(), - filePattern, - new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat), - new RowToCsv(csvFormat)) - .buildIOReader(pipeline.begin()) + final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl*"; + final PCollection<Row> table = GenericSourceSink.read(pipeline, filePattern) + .apply("StringToRow", new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat)) .setCoder(tableSchema.getValue().getRowCoder()) .setName(tableSchema.getKey()); tables = tables.and(new TupleTag<>(tableSchema.getKey()), table);
