[BEAM-2709] Add TezRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8994f07e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8994f07e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8994f07e Branch: refs/heads/tez-runner Commit: 8994f07e052e1958909d59896bcc4eff39019f3a Parents: f10399d Author: Scheller <[email protected]> Authored: Tue Jun 20 14:03:44 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Fri Nov 17 12:33:57 2017 -0800 ---------------------------------------------------------------------- runners/pom.xml | 1 + runners/tez/pom.xml | 135 +++++++++++++++ .../beam/runners/tez/TezPipelineOptions.java | 27 +++ .../org/apache/beam/runners/tez/TezRunner.java | 79 +++++++++ .../beam/runners/tez/TezRunnerResult.java | 78 +++++++++ .../FlattenPCollectionTranslator.java | 34 ++++ .../tez/translation/GroupByKeyTranslator.java | 43 +++++ .../tez/translation/ParDoTranslator.java | 92 ++++++++++ .../tez/translation/ReadBoundedTranslator.java | 43 +++++ .../tez/translation/TezDoFnProcessor.java | 131 +++++++++++++++ .../tez/translation/TezPipelineTranslator.java | 139 ++++++++++++++++ .../tez/translation/TransformTranslator.java | 28 ++++ .../tez/translation/TranslationContext.java | 166 +++++++++++++++++++ .../runners/tez/translation/TranslatorUtil.java | 147 ++++++++++++++++ .../ViewCreatePCollectionViewTranslator.java | 35 ++++ .../tez/translation/WindowAssignTranslator.java | 35 ++++ .../tez/translation/WriteFilesTranslator.java | 45 +++++ .../tez/translation/io/MROutputManager.java | 67 ++++++++ .../tez/translation/io/NoOpOutputManager.java | 35 ++++ .../io/OrderedPartitionedKVOutputManager.java | 62 +++++++ .../translation/io/OutputManagerFactory.java | 39 +++++ .../tez/translation/io/TezOutputManager.java | 62 +++++++ .../io/UnorderedKVEdgeOutputManager.java | 57 +++++++ .../apache/beam/runners/tez/TezRunnerTest.java | 155 +++++++++++++++++ .../tez/translation/ParDoTranslatorTest.java | 120 ++++++++++++++ .../tez/translation/TezDoFnProcessorTest.java | 112 +++++++++++++ .../tez/translation/TranslationContextTest.java | 110 ++++++++++++ .../tez/translation/TranslatorUtilTest.java | 43 +++++ runners/tez/src/test/resources/test_input.txt | 2 + 29 files changed, 2122 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/pom.xml ---------------------------------------------------------------------- diff --git a/runners/pom.xml b/runners/pom.xml index 47f3c0e..2892fe8 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -65,6 +65,7 @@ </activation> <modules> <module>gearpump</module> + <module>tez</module> </modules> </profile> </profiles> http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/pom.xml ---------------------------------------------------------------------- diff --git a/runners/tez/pom.xml b/runners/tez/pom.xml new file mode 100644 index 0000000..b7d0d6d --- /dev/null +++ b/runners/tez/pom.xml @@ -0,0 +1,135 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + + </build> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-parent</artifactId> + <version>2.0.0</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-runners-tez</artifactId> + <version>2.0.0</version> + + <name>Apache Beam :: Runners :: Tez</name> + + <packaging>jar</packaging> + + <dependencies> + <!-- Tez --> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-api</artifactId> + <version>0.8.4</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>2.7.3</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>2.7.3</version> + </dependency> + + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + <version>0.8.4</version> + </dependency> + + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-mapreduce</artifactId> + <version>0.8.4</version> + </dependency> + + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-runtime-library</artifactId> + <version>0.8.4</version> + </dependency> + + <!-- Beam --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-construction-java</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-java</artifactId> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-harness</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java new file mode 100644 index 0000000..8b37b09 --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezPipelineOptions.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez; + +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Options that configure the Tez pipeline. + */ +public interface TezPipelineOptions extends PipelineOptions, java.io.Serializable { + //TODO: Add options to configure Tez +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java new file mode 100644 index 0000000..7d32b47 --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunner.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.runners.tez.translation.TezPipelineTranslator; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PipelineRunner} that translates the + * pipeline to an Tez DAG and executes it on a Tez cluster. + * + */ +public class TezRunner extends PipelineRunner<TezRunnerResult>{ + + private static final Logger LOG = LoggerFactory.getLogger(TezClient.class); + + private final TezPipelineOptions options; + + private TezRunner(TezPipelineOptions options){ + this.options = options; + } + + public static TezRunner fromOptions(PipelineOptions options) { + TezPipelineOptions tezOptions = PipelineOptionsValidator.validate(TezPipelineOptions.class,options); + return new TezRunner(tezOptions); + } + + @Override + public TezRunnerResult run(Pipeline pipeline) { + //Setup Tez Local Config + TezConfiguration config = new TezConfiguration(); + config.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + config.set("fs.default.name", "file:///"); + config.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + config.set(TezConfiguration.TEZ_TASK_LOG_LEVEL, "DEBUG"); + //TODO: Support Remote Tez Configuration + + final TezPipelineTranslator translator = new TezPipelineTranslator(options, config); + final AtomicReference<DAG> tezDAG = new AtomicReference<>(); + DAG dag = DAG.create(options.getJobName()); + tezDAG.set(dag); + translator.translate(pipeline, dag); + + TezClient client = TezClient.create("TezRun", config); + try { + client.start(); + client.submitDAG(dag); + } catch (Exception e){ + e.printStackTrace(); + } + + return new TezRunnerResult(client); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java new file mode 100644 index 0000000..870c43c --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/TezRunnerResult.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez; + +import java.io.IOException; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.tez.client.TezAppMasterStatus; +import org.apache.tez.client.TezClient; +import org.joda.time.Duration; + +/** + * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Tez. + */ +public class TezRunnerResult implements PipelineResult { + + private final TezClient client; + private State state = State.UNKNOWN; + + public TezRunnerResult(TezClient client){ + this.client = client; + } + + @Override + public State getState() { + return state; + } + + @Override + public State waitUntilFinish() { + return waitUntilFinish(null); + } + + @Override + public State waitUntilFinish(Duration duration) { + long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE + : System.currentTimeMillis() + duration.getMillis(); + try { + while (client.getAppMasterStatus() != TezAppMasterStatus.SHUTDOWN && System.currentTimeMillis() < timeout) { + Thread.sleep(500); + } + if (!client.getAppMasterStatus().equals(TezAppMasterStatus.SHUTDOWN)){ + return null; + } + return State.DONE; + } catch (Exception e){ + throw new RuntimeException(e); + } + } + + @Override + public State cancel() throws IOException { + //TODO: CODE TO CANCEL PIPELINE + return state; + } + + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException(); + } + + +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java new file mode 100644 index 0000000..f1f5aee --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/FlattenPCollectionTranslator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import org.apache.beam.sdk.transforms.Flatten; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link Flatten} translation to Tez equivalent. + */ +class FlattenPCollectionTranslator<T> implements TransformTranslator<Flatten.PCollections<T>> { + private static final Logger LOG = LoggerFactory.getLogger(FlattenPCollectionTranslator.class); + + @Override + public void translate(Flatten.PCollections<T> transform, TranslationContext context) { + //TODO: Translate transform to Tez and add to TranslationContext + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java new file mode 100644 index 0000000..8f95752 --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/GroupByKeyTranslator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import com.google.common.collect.Iterables; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.values.PValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link GroupByKey} translation to Tez {@link org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig} + */ +class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> { + private static final Logger LOG = LoggerFactory.getLogger(GroupByKey.class); + + @Override + public void translate(GroupByKey<K, V> transform, TranslationContext context) { + if (context.getCurrentInputs().size() > 1 ){ + throw new RuntimeException("Multiple Inputs are not yet supported"); + } else if (context.getCurrentOutputs().size() > 1){ + throw new RuntimeException("Multiple Outputs are not yet supported"); + } + PValue input = Iterables.getOnlyElement(context.getCurrentInputs().values()); + PValue output = Iterables.getOnlyElement(context.getCurrentOutputs().values()); + context.addShufflePair(input, output); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java new file mode 100644 index 0000000..9ce11e6 --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ParDoTranslator.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import org.apache.beam.sdk.transforms.DoFn; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PValue; +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.Vertex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link org.apache.beam.sdk.transforms.ParDo} translation to Tez {@link Vertex}. + */ +class ParDoTranslator<InputT, OutputT> implements TransformTranslator<MultiOutput<InputT, OutputT>> { + private static final Logger LOG = LoggerFactory.getLogger(ParDoTranslator.class); + private static final String OUTPUT_TAG = "OUTPUT_TAG"; + private static final String DO_FN_INSTANCE_TAG = "DO_FN_INSTANCE"; + + @Override + public void translate(MultiOutput<InputT, OutputT> transform, TranslationContext context) { + //Prepare input/output targets + if (context.getCurrentInputs().size() > 1){ + throw new NotImplementedException("Multiple Inputs are not yet supported"); + } else if (context.getCurrentOutputs().size() > 1){ + throw new NotImplementedException("Multiple Outputs are not yet supported"); + } + PValue input = Iterables.getOnlyElement(context.getCurrentInputs().values()); + PValue output = Iterables.getOnlyElement(context.getCurrentOutputs().values()); + + //Prepare UserPayload Configuration + DoFn doFn = transform.getFn(); + String doFnInstance; + try { + doFnInstance = TranslatorUtil.toString(doFn); + } catch ( IOException e){ + throw new RuntimeException("DoFn failed to serialize: " + e.getMessage()); + } + Configuration config = new Configuration(); + config.set(OUTPUT_TAG, transform.getMainOutputTag().getId()); + config.set(DO_FN_INSTANCE_TAG, doFnInstance); + + //Check for shuffle input + boolean shuffle = false; + for (Pair<PValue, PValue> pair : context.getShuffleSet()){ + if (pair.getRight().equals(input)){ + shuffle = true; + } + } + + //Create Vertex with Payload + try { + UserPayload payload = TezUtils.createUserPayloadFromConf(config); + Vertex vertex; + if (shuffle) { + vertex = Vertex.create(context.getCurrentName(), ProcessorDescriptor.create(TezDoFnProcessor.class.getName()).setUserPayload(payload), 1); + //TODO: add customizable parallelism + } else { + vertex = Vertex.create(context.getCurrentName(), ProcessorDescriptor.create(TezDoFnProcessor.class.getName()).setUserPayload(payload)); + } + context.addVertex(context.getCurrentName(), vertex, input, output); + } catch (Exception e){ + throw new RuntimeException("Vertex Translation Failure from: " + e.getMessage()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java new file mode 100644 index 0000000..3192a81 --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ReadBoundedTranslator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Read.Bounded; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.mapreduce.input.MRInput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link Bounded} translation to Tez {@link DataSourceDescriptor}. + */ +class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> { + private static final Logger LOG = LoggerFactory.getLogger(TransformTranslator.class); + + @Override + public void translate(Bounded<T> transform, TranslationContext context) { + //Build datasource and add to datasource map + DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(context.getConfig()), + TextInputFormat.class, transform.getSource().toString()).build(); + //TODO: Support Configurable Input Formats + context.getCurrentOutputs().forEach( (a, b) -> context.addSource(b, dataSource)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java new file mode 100644 index 0000000..0fde90c --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezDoFnProcessor.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import com.google.common.collect.Iterables; +import java.util.LinkedList; +import org.apache.beam.fn.harness.fake.FakeStepContext; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.tez.translation.io.MROutputManager; +import org.apache.beam.runners.tez.translation.io.NoOpOutputManager; +import org.apache.beam.runners.tez.translation.io.OrderedPartitionedKVOutputManager; +import org.apache.beam.runners.tez.translation.io.OutputManagerFactory; +import org.apache.beam.runners.tez.translation.io.TezOutputManager; +import org.apache.beam.runners.tez.translation.io.UnorderedKVEdgeOutputManager; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.mapreduce.output.MROutput; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.library.api.KeyValueReader; +import org.apache.tez.runtime.library.api.KeyValuesReader; +import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; +import org.apache.tez.runtime.library.output.UnorderedKVOutput; +import org.apache.tez.runtime.library.processor.SimpleProcessor; + +/** + * TezDoFnProcessor is the Tez Wrapper to wrap user defined functions for Tez processing + * The DoFn is received through the {@link UserPayload} and then run using the simple {@link DoFnRunner} + */ +public class TezDoFnProcessor extends SimpleProcessor { + + private static final String OUTPUT_TAG = "OUTPUT_TAG"; + private static final String DO_FN_INSTANCE_TAG = "DO_FN_INSTANCE"; + + private DoFn<?,?> theDoFn; + private String outputTag; + + public TezDoFnProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void initialize() throws Exception { + Configuration config = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); + outputTag = config.get(OUTPUT_TAG, null); + String doFnInstance = config.get(DO_FN_INSTANCE_TAG, null); + theDoFn = (DoFn) TranslatorUtil.fromString(doFnInstance); + super.initialize(); + } + + @Override + public void run() throws Exception { + //Setup Reader + KeyValueReader kvReader = null; + KeyValuesReader kvsReader = null; + LogicalInput input = Iterables.getOnlyElement(getInputs().values()); + Reader reader = input.getReader(); + if (reader instanceof KeyValueReader) { + kvReader = (KeyValueReader) reader; + } else if (reader instanceof KeyValuesReader) { + kvsReader = (KeyValuesReader) reader; + } else { + throw new RuntimeException("UNSUPPORTED READER!"); + } + + //Setup Writer + TezOutputManager outputManager; + if (getOutputs().size() == 1){ + LogicalOutput output = Iterables.getOnlyElement(getOutputs().values()); + outputManager = OutputManagerFactory.createOutputManager(output); + outputManager.before(); + } else if (getOutputs().size() == 0){ + outputManager = new NoOpOutputManager(); + } else { + throw new RuntimeException("Multiple outputs not yet supported"); + } + + //Initialize DoFnRunner + DoFnRunner runner = DoFnRunners.simpleRunner(PipelineOptionsFactory.create(), theDoFn, NullSideInputReader + .empty(), outputManager, new TupleTag<>(outputTag), new LinkedList<>(), + new FakeStepContext(), WindowingStrategy.globalDefault()); + runner.startBundle(); + + //Start Runner + if (kvsReader != null){ + while (kvsReader.next()){ + outputManager.setCurrentElement(WindowedValue.valueInGlobalWindow(TranslatorUtil.convertToJavaType(kvsReader.getCurrentKey()))); + runner.processElement(WindowedValue.valueInGlobalWindow(KV.of(TranslatorUtil.convertToJavaType(kvsReader.getCurrentKey()), + TranslatorUtil.convertIteratorToJavaType(kvsReader.getCurrentValues())))); + } + } else if (kvReader != null){ + while (kvReader.next()){ + WindowedValue value = WindowedValue.valueInGlobalWindow(TranslatorUtil.convertToJavaType(kvReader.getCurrentKey())); + outputManager.setCurrentElement(value); + runner.processElement(WindowedValue.valueInGlobalWindow(TranslatorUtil.convertToJavaType(kvReader.getCurrentValue()))); + } + } else { + throw new RuntimeException("UNSUPPORTED READER!"); + } + + outputManager.after(); + runner.finishBundle(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java new file mode 100644 index 0000000..7b4646a --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TezPipelineTranslator.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.runners.tez.TezPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.WriteFiles; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PValue; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TezConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link TezPipelineTranslator} translates {@link Pipeline} objects + * into Tez logical plan {@link DAG}. + */ +public class TezPipelineTranslator implements Pipeline.PipelineVisitor { + + private static final Logger LOG = LoggerFactory.getLogger(TezPipelineTranslator.class); + + /** + * A map from {@link PTransform} subclass to the corresponding + * {@link TransformTranslator} to use to translate that transform. + */ + private static final Map<Class<? extends PTransform>, TransformTranslator> + transformTranslators = new HashMap<>(); + + private static final Map<Class<? extends PTransform>, TransformTranslator> + compositeTransformTranslators = new HashMap<>(); + + private final TranslationContext translationContext; + + static { + registerTransformTranslator(ParDo.MultiOutput.class, new ParDoTranslator<>()); + registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator<>()); + registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator<>()); + registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator<>()); + registerTransformTranslator(Flatten.PCollections.class, new FlattenPCollectionTranslator<>()); + registerTransformTranslator(View.CreatePCollectionView.class, new ViewCreatePCollectionViewTranslator<>()); + registerCompositeTransformTranslator(WriteFiles.class, new WriteFilesTranslator()); + } + + public TezPipelineTranslator(TezPipelineOptions options, TezConfiguration config){ + translationContext = new TranslationContext(options, config); + } + + public void translate(Pipeline pipeline, DAG dag) { + pipeline.traverseTopologically(this); + translationContext.populateDAG(dag); + } + + /** + * Main visitor method called on each {@link PTransform} to transform them to Tez objects. + * @param node Pipeline node containing {@link PTransform} to be translated. + */ + @Override + public void visitPrimitiveTransform(Node node) { + LOG.debug("visiting transform {}", node.getTransform()); + PTransform transform = node.getTransform(); + TransformTranslator translator = transformTranslators.get(transform.getClass()); + if (translator == null) { + throw new UnsupportedOperationException( + "no translator registered for " + transform); + } + translationContext.setCurrentTransform(node); + translator.translate(transform, translationContext); + } + + @Override + public CompositeBehavior enterCompositeTransform(Node node) { + LOG.debug("entering composite transform {}", node.getTransform()); + PTransform transform = node.getTransform(); + if (transform != null){ + TransformTranslator translator = compositeTransformTranslators.get(transform.getClass()); + if (translator != null) { + translationContext.setCurrentTransform(node); + translator.translate(transform, translationContext); + return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; + } + } + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(Node node) { + LOG.debug("leaving composite transform {}", node.getTransform()); + } + + @Override + public void visitValue(PValue value, Node producer) { + LOG.debug("visiting value {}", value); + } + + /** + * Records that instances of the specified PTransform class + * should be translated by default by the corresponding + * {@link TransformTranslator}. + */ + private static <TransformT extends PTransform> void registerTransformTranslator( + Class<TransformT> transformClass, TransformTranslator<? extends TransformT> transformTranslator) { + if (transformTranslators.put(transformClass, transformTranslator) != null) { + throw new IllegalArgumentException("defining multiple translators for " + transformClass); + } + } + + private static <TransformT extends PTransform> void registerCompositeTransformTranslator( + Class<TransformT> transformClass, TransformTranslator<? extends TransformT> transformTranslator) { + if (compositeTransformTranslators.put(transformClass, transformTranslator) != null) { + throw new IllegalArgumentException("defining multiple translators for " + transformClass); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java new file mode 100644 index 0000000..736c840 --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TransformTranslator.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import java.io.Serializable; +import org.apache.beam.sdk.transforms.PTransform; + +/** + * Translates {@link PTransform} to Tez functions. + */ +interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable { + void translate(T transform, TranslationContext context); +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java new file mode 100644 index 0000000..1bffe95 --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslationContext.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.beam.runners.tez.TezPipelineOptions; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.io.BytesWritable; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; +import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; + +/** + * Maintains context data for {@link TransformTranslator}s. + * Tracks and maintains each individual {@link Vertex} and their {@link Edge} connections. + */ +public class TranslationContext { + + private final TezPipelineOptions pipelineOptions; + private final TezConfiguration config; + + private AppliedPTransform<?, ?, ?> currentTransform; + private String currentName; + private Map<TupleTag<?>, PValue> currentInputs; + private Map<TupleTag<?>, PValue> currentOutputs; + + private Map<String, Vertex> vertexMap = new HashMap<>(); + private Map<PValue, Vertex> vertexInputMap = new HashMap<>(); + private Map<PValue, Vertex> vertexOutputMap = new HashMap<>(); + + private Set<Pair<PValue, PValue>> shuffleSet = new HashSet<>(); + + private Map<PValue, DataSourceDescriptor> sourceMap = new HashMap<>(); + private Map<PValue, DataSinkDescriptor> sinkMap = new HashMap<>(); + + public TranslationContext(TezPipelineOptions options, TezConfiguration config){ + this.pipelineOptions = options; + this.config = config; + } + + public void setCurrentTransform(TransformHierarchy.Node treeNode) { + this.currentTransform = treeNode.toAppliedPTransform(); + this.currentInputs = treeNode.getInputs(); + this.currentOutputs = treeNode.getOutputs(); + this.currentName = treeNode.getFullName(); + } + + public void addVertex(String name, Vertex vertex, PValue input, PValue output) { + vertexMap.put(name, vertex); + vertexInputMap.put(input, vertex); + vertexOutputMap.put(output, vertex); + } + + public void addShufflePair(PValue input, PValue output) { + shuffleSet.add(Pair.of(input, output)); + } + + public Set<Pair<PValue, PValue>> getShuffleSet(){ + return this.shuffleSet; + } + + public void addSource(PValue output, DataSourceDescriptor dataSource) { + sourceMap.put(output, dataSource); + } + + public void addSink(PValue input, DataSinkDescriptor dataSink) { + sinkMap.put(input, dataSink); + } + + public TezConfiguration getConfig() { + return config; + } + + public AppliedPTransform<?, ?, ?> getCurrentTransform() { + return currentTransform; + } + + public String getCurrentName() { + return currentName; + } + + public Map<TupleTag<?>, PValue> getCurrentInputs() { + return currentInputs; + } + + public Map<TupleTag<?>, PValue> getCurrentOutputs() { + return currentOutputs; + } + + /** + * Populates the given Tez dag with the context's {@link Vertex} and {@link Edge}. + * @param dag Empty Tez dag to be populated. + */ + public void populateDAG(DAG dag){ + + for (Vertex v : vertexMap.values()){ + dag.addVertex(v); + } + + //Add Sources + sourceMap.forEach( (value, source) -> { + Vertex vertex = vertexInputMap.get(value); + if (vertex != null){ + vertex.addDataSource(value.getName(), source); + } + }); + + //Add Sinks + sinkMap.forEach( (value, source) -> { + Vertex vertex = vertexOutputMap.get(value); + if (vertex != null){ + vertex.addDataSink(value.getName(), source); + } + }); + + //Add Shuffle Edges + for (Pair<PValue, PValue> pair : shuffleSet){ + Vertex inputVertex = vertexOutputMap.get(pair.getLeft()); + Vertex outputVertex = vertexInputMap.get(pair.getRight()); + OrderedPartitionedKVEdgeConfig edgeConfig = OrderedPartitionedKVEdgeConfig.newBuilder( + BytesWritable.class.getName(), BytesWritable.class.getName(), HashPartitioner.class.getName()).build(); + dag.addEdge(Edge.create(inputVertex, outputVertex, edgeConfig.createDefaultEdgeProperty())); + } + + //Add Edges + vertexInputMap.forEach( (PValue inputValue, Vertex inputVertex) -> { + vertexOutputMap.forEach( (outputValue, outputVertex) -> { + if (inputValue.equals(outputValue)){ + UnorderedKVEdgeConfig edgeConfig = UnorderedKVEdgeConfig.newBuilder(BytesWritable.class.getName(), + BytesWritable.class.getName()).build(); + dag.addEdge(Edge.create(outputVertex, inputVertex, edgeConfig.createDefaultOneToOneEdgeProperty())); + } + }); + }); + } + + +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java new file mode 100644 index 0000000..32b3ad0 --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/TranslatorUtil.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Base64; +import org.apache.beam.sdk.transforms.DoFn; +import java.util.List; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; + +/** + * Translator Utilities to convert between hadoop and java types. + */ +public class TranslatorUtil { + + /** + * Utility to convert java objects to bytes and place them in BytesWritable wrapper for hadoop use. + * @param element java object to be converted + * @return BytesWritable wrapped object + */ + public static Object convertToBytesWritable(Object element) { + byte[] bytes; + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = new ObjectOutputStream(bos)) { + out.writeObject(element); + out.flush(); + bytes = bos.toByteArray(); + } catch (Exception e){ + throw new RuntimeException("Failed to serialize object into byte array: " + e.getMessage()); + } + if (bytes != null) { + return new BytesWritable(bytes); + } else { + throw new RuntimeException("Cannot convert null element to BytesWritable!"); + } + } + + /** + * Utility to convert hadoop objects back to their java equivalent. + * @param element hadoop object to be converted + * @return original java object + */ + public static Object convertToJavaType(Object element) { + Object returnValue; + if (element instanceof BytesWritable){ + BytesWritable myElement = (BytesWritable) element; + byte[] data = myElement.getBytes(); + try (ByteArrayInputStream bis = new ByteArrayInputStream(data); + ObjectInput in = new ObjectInputStream(bis)) { + returnValue = in.readObject(); + } catch (Exception e){ + throw new RuntimeException("Failed to deserialize object from byte array: " + e.getMessage()); + } + } else if (element instanceof Text) { + returnValue = element.toString(); + } else if (element instanceof BooleanWritable) { + returnValue = ((BooleanWritable) element).get(); + } else if (element instanceof IntWritable){ + returnValue = ((IntWritable) element).get(); + } else if (element instanceof DoubleWritable){ + returnValue = ((DoubleWritable) element).get(); + } else if (element instanceof FloatWritable){ + returnValue = ((FloatWritable) element).get(); + } else if (element instanceof LongWritable){ + returnValue = ((LongWritable) element).get(); + } else if (element instanceof ShortWritable){ + returnValue = ((ShortWritable) element).get(); + } else if (element instanceof ObjectWritable){ + returnValue = ((ObjectWritable) element).get(); + } else { + throw new RuntimeException("Hadoop Type " + element.getClass() + " cannot be converted to Java!"); + } + return returnValue; + } + + /** + * Utility to convert hadoop objects within an iterable back to their java equivalent. + * @param iterable Iterable containing objects to be converted + * @return new Iterable with original java objects + */ + static Iterable<Object> convertIteratorToJavaType(Iterable<Object> iterable){ + List<Object> list = new ArrayList<>(); + iterable.iterator().forEachRemaining((Object element) -> list.add(convertToJavaType(element))); + return list; + } + + /** + * Utility to serialize a serializable object into a string. + * @param object that is serializable to be serialized. + * @return serialized string + * @throws IOException thrown for serialization errors. + */ + public static String toString( Serializable object ) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(object); + oos.close(); + return Base64.getEncoder().encodeToString(baos.toByteArray()); + } + + /** + * Utility to deserialize a string into a serializable object. + * @param string containing serialized object. + * @return Original object + * @throws IOException thrown for serialization errors. + * @throws ClassNotFoundException thrown for serialization errors. + */ + public static Object fromString( String string ) throws IOException, ClassNotFoundException { + byte [] data = Base64.getDecoder().decode(string); + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data)); + Object object = ois.readObject(); + ois.close(); + return object; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java new file mode 100644 index 0000000..3fbb296 --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/ViewCreatePCollectionViewTranslator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import org.apache.beam.sdk.transforms.View; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link View.CreatePCollectionView} translation to Tez equivalent. + */ +class ViewCreatePCollectionViewTranslator<ElemT, ViewT> implements + TransformTranslator<View.CreatePCollectionView<ElemT, ViewT>> { + private static final Logger LOG = LoggerFactory.getLogger(ViewCreatePCollectionViewTranslator.class); + + @Override + public void translate(View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) { + //TODO: Translate transform to Tez and add to TranslationContext + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java new file mode 100644 index 0000000..433e5a5 --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WindowAssignTranslator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.Window.Assign; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link Assign} translation to Tez equivalent. + */ +class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> { + private static final Logger LOG = LoggerFactory.getLogger(WindowAssignTranslator.class); + + @Override + public void translate(Assign<T> transform, TranslationContext context) { + //TODO: Translate transform to Tez and add to TranslationContext + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java new file mode 100644 index 0000000..312d8ae --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/WriteFilesTranslator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.io.WriteFiles; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.mapreduce.output.MROutput; + +/** + * {@link MROutput} translation to Tez {@link DataSinkDescriptor}. + */ +class WriteFilesTranslator implements TransformTranslator<WriteFiles<?>> { + + @Override + public void translate(WriteFiles transform, TranslationContext context) { + Pattern pattern = Pattern.compile(".*\\{.*\\{value=(.*)}}.*"); + Matcher matcher = pattern.matcher(transform.getSink().getBaseOutputDirectoryProvider().toString()); + if (matcher.matches()){ + String output = matcher.group(1); + DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(context.getConfig()), + TextOutputFormat.class, output).build(); + + context.getCurrentInputs().forEach( (a, b) -> context.addSink(b, dataSink)); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java new file mode 100644 index 0000000..8305e1e --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/MROutputManager.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation.io; + +import java.io.IOException; +import org.apache.beam.runners.tez.translation.TranslatorUtil; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.tez.mapreduce.output.MROutput; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.library.api.KeyValueWriter; + +/** + * {@link TezOutputManager} implementation that properly writes output to {@link MROutput} + */ +public class MROutputManager extends TezOutputManager { + + private MROutput output; + + public MROutputManager(LogicalOutput output) { + super(output); + if (output.getClass().equals(MROutput.class)){ + this.output = (MROutput) output; + try { + setWriter((KeyValueWriter) output.getWriter()); + } catch (Exception e) { + throw new RuntimeException("Error when retrieving writer for output" + e.getMessage()); + } + } else { + throw new RuntimeException("Incorrect OutputManager for: " + output.getClass()); + } + } + + @Override + public void after() { + try { + output.flush(); + output.commit(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + try { + getWriter().write(null, TranslatorUtil.convertToBytesWritable(output.getValue())); + } catch (Exception e){ + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java new file mode 100644 index 0000000..725be0a --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/NoOpOutputManager.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation.io; + +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * {@link TezOutputManager} implementation for when the {@link org.apache.tez.dag.api.Vertex} has no output. + * Used in cases such as when the ParDo within the Vertex writes the output itself. + */ +public class NoOpOutputManager extends TezOutputManager { + + public NoOpOutputManager() { + super(null); + } + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {} +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java new file mode 100644 index 0000000..4a652da --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OrderedPartitionedKVOutputManager.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation.io; + +import org.apache.beam.runners.tez.translation.TranslatorUtil; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.library.api.KeyValueWriter; +import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; + +/** + * {@link TezOutputManager} implementation that properly writes output to {@link OrderedPartitionedKVOutput} + */ +public class OrderedPartitionedKVOutputManager extends TezOutputManager { + + private OrderedPartitionedKVOutput output; + + public OrderedPartitionedKVOutputManager(LogicalOutput output) { + super(output); + if (output.getClass().equals(OrderedPartitionedKVOutput.class)){ + this.output = (OrderedPartitionedKVOutput) output; + try { + setWriter((KeyValueWriter) output.getWriter()); + } catch (Exception e) { + throw new RuntimeException("Error when retrieving writer for output" + e.getMessage()); + } + } else { + throw new RuntimeException("Incorrect OutputManager for: " + output.getClass()); + } + } + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + try { + if (output.getValue() instanceof KV) { + getWriter().write(TranslatorUtil.convertToBytesWritable(((KV) output.getValue()).getKey()), + TranslatorUtil.convertToBytesWritable(((KV) output.getValue()).getValue())); + } else { + throw new IllegalArgumentException("GroupByKey can only group Key-Value outputs!"); + } + } catch (Exception e){ + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java new file mode 100644 index 0000000..807de3f --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/OutputManagerFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation.io; + +import org.apache.tez.mapreduce.output.MROutput; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; +import org.apache.tez.runtime.library.output.UnorderedKVOutput; + +public class OutputManagerFactory { + public static TezOutputManager createOutputManager(LogicalOutput output){ + TezOutputManager outputManager; + if (output.getClass().equals(OrderedPartitionedKVOutput.class)){ + outputManager = new OrderedPartitionedKVOutputManager(output); + } else if (output.getClass().equals(UnorderedKVOutput.class)){ + outputManager = new UnorderedKVEdgeOutputManager(output); + } else if (output.getClass().equals(MROutput.class)){ + outputManager = new MROutputManager(output); + } else { + throw new RuntimeException("Output type: " + output.getClass() + " is unsupported"); + } + return outputManager; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java new file mode 100644 index 0000000..2999ee5 --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/TezOutputManager.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation.io; + +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.library.api.KeyValueWriter; + +/** + * Abstract Output Manager that adds before and after methods to the {@link DoFnRunners.OutputManager} + * interface so that outputs that require them can be added and used with the TezRunner. + */ +public abstract class TezOutputManager implements DoFnRunners.OutputManager { + + private WindowedValue currentElement; + private KeyValueWriter writer; + private LogicalOutput output; + + public TezOutputManager(LogicalOutput output){ + this.output = output; + } + + public void before() {} + + public void after() {} + + public void setCurrentElement(WindowedValue currentElement) { + this.currentElement = currentElement; + } + + public WindowedValue getCurrentElement(){ + return currentElement; + } + + public void setWriter(KeyValueWriter writer) { + this.writer = writer; + } + + public KeyValueWriter getWriter() { + return writer; + } + + public LogicalOutput getOutput() { + return output; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java new file mode 100644 index 0000000..34cb371 --- /dev/null +++ b/runners/tez/src/main/java/org/apache/beam/runners/tez/translation/io/UnorderedKVEdgeOutputManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation.io; + +import org.apache.beam.runners.tez.translation.TranslatorUtil; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.library.api.KeyValueWriter; +import org.apache.tez.runtime.library.output.UnorderedKVOutput; + +/** + * {@link TezOutputManager} implementation that properly writes output to {@link UnorderedKVOutput} + */ +public class UnorderedKVEdgeOutputManager extends TezOutputManager { + + private UnorderedKVOutput output; + + public UnorderedKVEdgeOutputManager(LogicalOutput output) { + super(output); + if (output.getClass().equals(UnorderedKVOutput.class)){ + this.output = (UnorderedKVOutput) output; + try { + setWriter((KeyValueWriter) output.getWriter()); + } catch (Exception e) { + throw new RuntimeException("Error when retrieving writer for output" + e.getMessage()); + } + } else { + throw new RuntimeException("Incorrect OutputManager for: " + output.getClass()); + } + } + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + try { + getWriter().write(TranslatorUtil.convertToBytesWritable(getCurrentElement().getValue()), + TranslatorUtil.convertToBytesWritable(output.getValue())); + } catch (Exception e){ + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java b/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java new file mode 100644 index 0000000..66e9f19 --- /dev/null +++ b/runners/tez/src/test/java/org/apache/beam/runners/tez/TezRunnerTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for the Tez runner. + */ +public class TezRunnerTest { + + private static final String TOKENIZER_PATTERN = "[^\\p{L}]+"; + private static final String INPUT_LOCATION = "src/test/resources/test_input.txt"; + + private static Pipeline tezPipeline; + private static Pipeline directPipeline; + + @Before + public void setupPipelines(){ + //TezRunner Pipeline + PipelineOptions tezOptions = PipelineOptionsFactory.create(); + tezOptions.setRunner(TezRunner.class); + tezPipeline = Pipeline.create(tezOptions); + + //DirectRunner Pipeline + PipelineOptions options = PipelineOptionsFactory.create(); + directPipeline = Pipeline.create(options); + } + + @Test + public void simpleTest() throws Exception { + tezPipeline.apply(TextIO.read().from(INPUT_LOCATION)) + .apply(ParDo.of(new AddHelloWorld())) + .apply(ParDo.of(new TestTezFn())); + + directPipeline.apply(TextIO.read().from(INPUT_LOCATION)) + .apply(ParDo.of(new AddHelloWorld())) + .apply(ParDo.of(new TestDirectFn())); + + tezPipeline.run().waitUntilFinish(); + directPipeline.run().waitUntilFinish(); + Assert.assertEquals(TestDirectFn.RESULTS, TestTezFn.RESULTS); + } + + @Test + public void wordCountTest() throws Exception { + tezPipeline.apply("ONE", TextIO.read().from(INPUT_LOCATION)) + .apply("TWO", ParDo.of(new TokenDoFn())) + .apply("THREE", GroupByKey.create()) + .apply("FOUR", ParDo.of(new ProcessDoFn())) + .apply("FIVE", ParDo.of(new TestTezFn())); + + directPipeline.apply("ONE", TextIO.read().from(INPUT_LOCATION)) + .apply("TWO", ParDo.of(new TokenDoFn())) + .apply("THREE", GroupByKey.create()) + .apply("FOUR", ParDo.of(new ProcessDoFn())) + .apply("FIVE", ParDo.of(new TestDirectFn())); + + tezPipeline.run().waitUntilFinish(); + directPipeline.run().waitUntilFinish(); + Assert.assertEquals(TestDirectFn.RESULTS, TestTezFn.RESULTS); + } + + private static class AddHelloWorld extends DoFn<String, String>{ + @ProcessElement + public void processElement(ProcessContext c) { + + // Split the line into words. + String[] words = c.element().split(TOKENIZER_PATTERN); + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word + " HelloWorld"); + } + } + } + } + + public static class TokenDoFn extends DoFn<String, KV<String, Integer>>{ + @ProcessElement + public void processElement(ProcessContext c){ + for( String word : c.element().split(TOKENIZER_PATTERN)){ + if(!word.isEmpty()){ + c.output(KV.of(word, 1)); + } + } + } + } + + public static class ProcessDoFn extends DoFn<KV<String,Iterable<Integer>>, String>{ + @ProcessElement + public void processElement(ProcessContext c){ + Integer sum = 0; + for( Integer integer : c.element().getValue()){ + sum = sum + integer; + } + c.output(c.element().getKey() + ": " + sum); + } + } + + private static class TestTezFn extends DoFn<String, String> { + private static final Set<String> RESULTS = Collections.synchronizedSet(new HashSet<>()); + + public TestTezFn(){ + RESULTS.clear(); + } + + @ProcessElement + public void processElement(ProcessContext c) { + RESULTS.add(c.element()); + } + } + + private static class TestDirectFn extends DoFn<String, String> { + private static final Set<String> RESULTS = Collections.synchronizedSet(new HashSet<>()); + + public TestDirectFn(){ + RESULTS.clear(); + } + + @ProcessElement + public void processElement(ProcessContext c) { + RESULTS.add(c.element()); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java new file mode 100644 index 0000000..df3532d --- /dev/null +++ b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/ParDoTranslatorTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import com.google.common.collect.Iterables; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.runners.tez.TezPipelineOptions; +import org.apache.beam.runners.tez.TezRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.PValueBase; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for the ParDoTranslator class + */ +public class ParDoTranslatorTest { + + private static final String DO_FN_INSTANCE_TAG = "DO_FN_INSTANCE"; + private static final String TEST_TAG = "TestName"; + private static TransformHierarchy hierarchy; + private static PValue pvalue; + private static DAG dag; + private static TranslationContext context; + private static ParDoTranslator translator; + + @Test + public void testParDoTranslation() throws Exception { + MultiOutput parDo = ParDo.of(new TestDoFn()).withOutputTags(new TupleTag<>(), TupleTagList.of(new TupleTag<String>())); + Node node = hierarchy.pushNode(TEST_TAG, pvalue, parDo); + hierarchy.setOutput(pvalue); + context.setCurrentTransform(node); + translator.translate(parDo, context); + context.populateDAG(dag); + Vertex vertex = Iterables.getOnlyElement(dag.getVertices()); + Configuration config = TezUtils.createConfFromUserPayload(vertex.getProcessorDescriptor().getUserPayload()); + String doFnString = config.get(DO_FN_INSTANCE_TAG); + DoFn doFn = (DoFn) TranslatorUtil.fromString(doFnString); + + Assert.assertEquals(vertex.getProcessorDescriptor().getClassName(), TezDoFnProcessor.class.getName()); + Assert.assertEquals(doFn.getClass(), TestDoFn.class); + } + + @Before + public void setupTest(){ + dag = DAG.create(TEST_TAG); + translator = new ParDoTranslator(); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(TezRunner.class); + TezPipelineOptions tezOptions = PipelineOptionsValidator.validate(TezPipelineOptions.class, options); + context = new TranslationContext(tezOptions, new TezConfiguration()); + hierarchy = new TransformHierarchy(Pipeline.create()); + PValue innerValue = new PValueBase() { + @Override + public String getName() {return null;} + }; + pvalue = new PValue() { + @Override + public String getName() {return null;} + + @Override + public Map<TupleTag<?>, PValue> expand() { + Map<TupleTag<?>, PValue> map = new HashMap<>(); + map.put(new TupleTag<>(), innerValue); + return map; + } + + @Override + public void finishSpecifying(PInput upstreamInput, PTransform<?, ?> upstreamTransform) {} + + @Override + public Pipeline getPipeline() {return null;} + + @Override + public void finishSpecifyingOutput(String transformName, PInput input, PTransform<?, ?> transform) {} + }; + } + + private static class TestDoFn extends DoFn<String, String> { + @ProcessElement + public void processElement(ProcessContext c) { + //Test DoFn + } + } +}
