This is an automated email from the ASF dual-hosted git repository. johnyangk pushed a commit to branch tpch-fix in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 68ab8efc4464aa3d77f91943bb91ee0f3ddde90e Author: John Yang <[email protected]> AuthorDate: Sun Sep 9 18:57:01 2018 +0900 q6 success --- .../compiler/frontend/beam/PipelineTranslator.java | 12 +++++++ .../frontend/beam/transform/DoTransform.java | 38 +++++++++++++++++++++- .../nemo/examples/beam/GenericSourceSink.java | 2 +- .../org/apache/nemo/examples/beam/tpch/Tpch.java | 11 ++++--- .../apache/nemo/examples/beam/SQLTpchITCase.java | 7 ++-- 5 files changed, 60 insertions(+), 10 deletions(-) diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java index 6ef96bf..67b8828 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java @@ -117,6 +117,18 @@ public final class PipelineTranslator private static void parDoSingleOutputTranslator(final TranslationContext ctx, final PrimitiveTransformVertex transformVertex, final ParDo.SingleOutput<?, ?> transform) { + /* + final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + final Map<String, DoFnSignature.StateDeclaration> stateDeclarationMap = signature.stateDeclarations(); + for (final DoFnSignature.StateDeclaration declaration : stateDeclarationMap.values()) { + final TypeDescriptor<? extends State> stateType = declaration.stateType(); + if (stateType.isSubtypeOf(TypeDescriptor.of(ValueState.class))) { + declaration.field() + } else { + throw new UnsupportedOperationException("Only ValueState is supported at the moment"); + } + } + */ final DoTransform doTransform = new DoTransform(transform.getFn(), ctx.pipelineOptions); final IRVertex vertex = new OperatorVertex(doTransform); ctx.addVertex(vertex); diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java index 3a36ec0..1935370 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java @@ -16,6 +16,7 @@ package org.apache.nemo.compiler.frontend.beam.transform; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.beam.sdk.state.ValueState; import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.ir.vertex.transform.Transform; import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl; @@ -35,7 +36,9 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; +import javax.annotation.Nullable; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -200,6 +203,37 @@ public final class DoTransform<I, O> implements Transform<I, O> { private final ObjectMapper mapper; private final PipelineOptions options; + private final Map<String, TmpIntegerValueState> idToValueState; + private final Map<String, Integer> idToInteger; + private class TmpIntegerValueState implements ValueState<Integer> { + final String stateId; + + public TmpIntegerValueState(final String stateId) { + this.stateId = stateId; + } + + @Override + public void write(final Integer state) { + idToInteger.put(stateId, state); + } + + @Nullable + @Override + public Integer read() { + return idToInteger.get(stateId); + } + + @Override + public ValueState<Integer> readLater() { + return this; + } + + @Override + public void clear() { + // Do nothing. + } + } + /** * ProcessContext Constructor. * @@ -222,6 +256,8 @@ public final class DoTransform<I, O> implements Transform<I, O> { } catch (IOException e) { throw new RuntimeException(e); } + this.idToValueState = new HashMap<>(1); + this.idToInteger = new HashMap<>(1); } /** @@ -369,7 +405,7 @@ public final class DoTransform<I, O> implements Transform<I, O> { @Override public State state(final String stateId) { - throw new UnsupportedOperationException("state() in ProcessContext under DoTransform"); + return idToValueState.computeIfAbsent(stateId, TmpIntegerValueState::new); } @Override diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java index 51fd3bd..5ac8b20 100644 --- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; * Helper class for handling source/sink in a generic way. * Assumes String-type PCollections. */ -final class GenericSourceSink { +public final class GenericSourceSink { /** * Default Constructor. */ diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java index 4f45920..ad93bd9 100644 --- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/tpch/Tpch.java @@ -26,10 +26,12 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.*; import org.apache.commons.csv.CSVFormat; import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions; import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner; +import org.apache.nemo.examples.beam.GenericSourceSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -250,24 +252,25 @@ public final class Tpch { final Pipeline p = Pipeline.create(options); // Create tables - final CSVFormat csvFormat = CSVFormat.MYSQL.withDelimiter('|').withNullString(""); + final CSVFormat csvFormat = CSVFormat.MYSQL + .withDelimiter('|') + .withNullString("") + .withTrailingDelimiter(); final PCollectionTuple tables = getHTables(p, csvFormat, inputDirectory); // Run the TPC-H query final PCollection<Row> result = tables.apply(SqlTransform.query(idToQuery.get(queryId))); - /* final PCollection<String> resultToWrite = result.apply(MapElements.into(TypeDescriptors.strings()).via( new SerializableFunction<Row, String>() { @Override - public String apply(Row input) { + public String apply(final Row input) { System.out.println("row: " + input.getValues()); return "row: " + input.getValues(); } })); GenericSourceSink.write(resultToWrite, outputFilePath); - */ // Then run p.run(); diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java index 0b7668e..427d053 100644 --- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java +++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/SQLTpchITCase.java @@ -17,6 +17,7 @@ package org.apache.nemo.examples.beam; import org.apache.nemo.client.JobLauncher; import org.apache.nemo.common.test.ArgBuilder; +import org.apache.nemo.common.test.ExampleTestUtil; import org.apache.nemo.examples.beam.policy.DefaultPolicyParallelismFive; import org.apache.nemo.examples.beam.tpch.Tpch; import org.junit.After; @@ -36,8 +37,8 @@ public final class SQLTpchITCase { private static ArgBuilder builder; private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/"; - private static final String outputFileName = "test_output_simplesql"; - private static final String expectedOutputFileName = "expected_output_simplesql"; + private static final String outputFileName = "test_output_tpch"; + private static final String expectedOutputFileName = "expected_output_tpch"; private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json"; private static final String outputFilePath = fileBasePath + outputFileName; @@ -49,13 +50,11 @@ public final class SQLTpchITCase { @After public void tearDown() throws Exception { - /* try { ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedOutputFileName); } finally { ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName); } - */ } @Test (timeout = TIMEOUT)
