This is an automated email from the ASF dual-hosted git repository. johnyangk pushed a commit to branch tpch-fix in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit a471303f5b3a0ee29c5a674849f21eb20d71baca Author: John Yang <[email protected]> AuthorDate: Wed Sep 12 17:33:19 2018 +0900 query from file --- .../org/apache/nemo/examples/beam/tpch/Tpch.java | 56 ++++++++++++++++++++-- .../apache/nemo/examples/beam/SQLTpchITCase.java | 20 ++++++++ 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java index 42db3da..0b62e8e 100644 --- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java @@ -19,7 +19,6 @@ package org.apache.nemo.examples.beam.tpch; import com.google.common.collect.ImmutableMap; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.sql.SqlTransform; -import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTable; import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -35,9 +34,16 @@ import org.apache.nemo.examples.beam.GenericSourceSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine; @@ -214,7 +220,7 @@ public final class Tpch { PCollectionTuple tables = PCollectionTuple.empty(pipeline); for (final Map.Entry<String, Schema> tableSchema : hSchemas.entrySet()) { - final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl*"; + final String filePattern = inputDirectory + tableSchema.getKey() + ".tbl"; final PCollection<Row> table = GenericSourceSink.read(pipeline, filePattern) .apply("StringToRow", new TextTableProvider.CsvToRow(tableSchema.getValue(), csvFormat)) .setCoder(tableSchema.getValue().getRowCoder()) @@ -242,7 +248,7 @@ public final class Tpch { idToQuery.put(5, QUERY5); idToQuery.put(6, QUERY6); - LOG.info("{} / {}", queryId, inputDirectory, outputFilePath); + LOG.info("{} / {} / {}", queryId, inputDirectory, outputFilePath); final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class); options.setRunner(NemoPipelineRunner.class); @@ -257,7 +263,7 @@ public final class Tpch { final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory); // Run the TPC-H query - final PCollection<Row> result = tables.apply(SqlTransform.query(idToQuery.get(queryId))); + final PCollection<Row> result = tables.apply(SqlTransform.query(getQueryString(queryId))); final PCollection<String> resultToWrite = result.apply(MapElements.into(TypeDescriptors.strings()).via( new SerializableFunction<Row, String>() { @@ -273,4 +279,46 @@ public final class Tpch { // Then run p.run(); } + + private static String getQueryString(final int queryNum) { + boolean isStarted = false; + final List<String> lines = new ArrayList<>(); + + + final String path = + "/Users/johnyang/Documents/workspace/hive-testbench/sample-queries-tpch/tpch_query" + queryNum + ".sql"; + + try (final Stream<String> stream = Files.lines(Paths.get(path))) { + for (final String line : stream.collect(Collectors.toList())) { + if (line.equals("select")) { + isStarted = true; + } + + if (isStarted) { + lines.add(line); + } + } + + } catch (IOException e) { + throw new RuntimeException(e); + } + + System.out.println(lines); + lines.remove(lines.size() - 1); + + final StringBuilder sb = new StringBuilder(); + lines.forEach(line -> { + sb.append(" "); + sb.append(line); + }); + + final String concate = sb.toString(); + System.out.println(concate); + final String cleanOne = concate.replaceAll("\n", " "); + System.out.println(cleanOne); + final String cleanTwo = cleanOne.replaceAll("\t", " "); + System.out.println(cleanTwo); + + return cleanTwo; + } } diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java index 87df9ae..80e9f68 100644 --- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java +++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java @@ -27,6 +27,15 @@ import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import java.io.BufferedReader; +import java.io.FileReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + /** * Test TPC-H program with JobLauncher. */ @@ -58,6 +67,17 @@ public final class SQLTpchITCase { } @Test (timeout = TIMEOUT) + public void testXX() throws Exception { + final int queryNum = 12; + JobLauncher.main(builder + .addUserMain(Tpch.class.getCanonicalName()) + .addUserArgs(String.valueOf(queryNum), "/home/johnyangk/Desktop/tpc-concat-tbls/", outputFilePath) + .addJobId(SQLTpchITCase.class.getSimpleName()) + .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) + .build()); + } + + @Test (timeout = TIMEOUT) public void testThree() throws Exception { final int queryNum = 3; JobLauncher.main(builder
