Repository: beam Updated Branches: refs/heads/tez-runner f10399d7c -> da4687a6e
http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TezDoFnProcessorTest.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TezDoFnProcessorTest.java b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TezDoFnProcessorTest.java new file mode 100644 index 0000000..cfd8e41 --- /dev/null +++ b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TezDoFnProcessorTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.tez.client.TezAppMasterStatus; +import org.apache.tez.client.TezClient; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for the TezDoFnProcessor that wraps beam DoFns. + */ +public class TezDoFnProcessorTest { + + private static final String TOKENIZER_PATTERN = "[^\\p{L}]+"; + private static final String INPUT_LOCATION = "src/test/resources/test_input.txt"; + private static final DAG dag = DAG.create("TestDag"); + private static TezClient client; + + @Before + public void setUp(){ + TezConfiguration config = new TezConfiguration(); + config.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + config.set("fs.default.name", "file:///"); + config.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + config.set(TezConfiguration.TEZ_TASK_LOG_LEVEL, "DEBUG"); + client = TezClient.create("TezClient", config); + } + + @Test + public void testDoFn() throws Exception { + String expected = FileUtils.readFileToString(new File(INPUT_LOCATION)); + Set<String> expectedSet = new HashSet<>(Arrays.asList(expected.split(TOKENIZER_PATTERN))); + + DoFn<?,?> doFn = new TestWordsFn(); + String doFnInstance; + doFnInstance = TranslatorUtil.toString(doFn); + + Configuration config = new Configuration(); + config.set("OUTPUT_TAG", new TupleTag<>().getId()); + config.set("DO_FN_INSTANCE", doFnInstance); + UserPayload payload = TezUtils.createUserPayloadFromConf(config); + + Vertex vertex = Vertex.create("TestVertex", ProcessorDescriptor + .create(TezDoFnProcessor.class.getName()).setUserPayload(payload)); + vertex.addDataSource("TestInput" , MRInput.createConfigBuilder(new Configuration(), + TextInputFormat.class, INPUT_LOCATION).build()); + + dag.addVertex(vertex); + client.start(); + client.submitDAG(dag); + while (client.getAppMasterStatus() != TezAppMasterStatus.SHUTDOWN){} + + Assert.assertEquals(expectedSet, TestWordsFn.RESULTS); + } + + private static class TestWordsFn extends DoFn<String, String> { + private static final Set<String> RESULTS = Collections.synchronizedSet(new HashSet<>()); + + public TestWordsFn(){ + RESULTS.clear(); + } + + @ProcessElement + public void processElement(ProcessContext c) { + + // Split the line into words. + String[] words = c.element().split(TOKENIZER_PATTERN); + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + RESULTS.add(word); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslationContextTest.java b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslationContextTest.java new file mode 100644 index 0000000..2efca6a --- /dev/null +++ b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslationContextTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import com.google.common.collect.Iterables; +import org.apache.beam.runners.tez.TezPipelineOptions; +import org.apache.beam.runners.tez.TezRunner; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.PValueBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.mapreduce.input.MRInput; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for the TranslationContext class + */ +public class TranslationContextTest { + + private static final String TEST_SOURCE = "Test.txt"; + private static final String DAG_NAME = "TestDag"; + private static final String VERTEX1_NAME = "TestVertex1"; + private static final String VERTEX2_NAME = "TestVertex2"; + private static final String PVALUE_NAME = "TestPValue"; + + private static PValue value1; + private static PValue value2; + private static PValue value3; + private static TranslationContext context; + private static DAG dag; + + @Before + public void setUp() { + dag = DAG.create(DAG_NAME); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(TezRunner.class); + TezPipelineOptions tezOptions = PipelineOptionsValidator.validate(TezPipelineOptions.class, options); + context = new TranslationContext(tezOptions, new TezConfiguration()); + value1 = new PValueBase() { + @Override + public String getName() { + return PVALUE_NAME; + } + }; + value2 = new PValueBase() { + @Override + public String getName() { + return PVALUE_NAME; + } + }; + value3 = new PValueBase() { + @Override + public String getName() { + return PVALUE_NAME; + } + }; + } + + @Test + public void testVertexConnect() throws Exception { + Vertex vertex1 = Vertex.create(VERTEX1_NAME, ProcessorDescriptor.create(TezDoFnProcessor.class.getName())); + Vertex vertex2 = Vertex.create(VERTEX2_NAME, ProcessorDescriptor.create(TezDoFnProcessor.class.getName())); + context.addVertex(VERTEX1_NAME, vertex1, value1, value2); + context.addVertex(VERTEX2_NAME, vertex2, value2, value3); + context.populateDAG(dag); + Vertex vertex1Output = Iterables.getOnlyElement(dag.getVertex(VERTEX1_NAME).getOutputVertices()); + Vertex vertex2Input = Iterables.getOnlyElement(dag.getVertex(VERTEX2_NAME).getInputVertices()); + + Assert.assertEquals(vertex2, vertex1Output); + Assert.assertEquals(vertex1, vertex2Input); + } + + @Test + public void testDataSourceConnect() throws Exception { + Vertex vertex1 = Vertex.create(VERTEX1_NAME, ProcessorDescriptor.create(TezDoFnProcessor.class.getName())); + context.addVertex(VERTEX1_NAME, vertex1, value1, value2); + DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(context.getConfig()), + TextInputFormat.class, TEST_SOURCE).groupSplits(true).generateSplitsInAM(true).build(); + context.addSource(value1, dataSource); + context.populateDAG(dag); + DataSourceDescriptor vertex1Source = Iterables.getOnlyElement(dag.getVertex(VERTEX1_NAME).getDataSources()); + + Assert.assertEquals(dataSource, vertex1Source); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslatorUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslatorUtilTest.java b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslatorUtilTest.java new file mode 100644 index 0000000..b2bb5fc --- /dev/null +++ b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslatorUtilTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.tez.translation; + +import org.apache.beam.sdk.transforms.DoFn; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the Tez TranslatorUtil class + */ +public class TranslatorUtilTest { + + @Test + public void testDoFnSerialization() throws Exception { + DoFn doFn = new testDoFn(); + String doFnString = TranslatorUtil.toString(doFn); + DoFn newDoFn = (DoFn) TranslatorUtil.fromString(doFnString); + Assert.assertEquals(newDoFn.getClass(), doFn.getClass()); + } + + private static class testDoFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) { + //Test DoFn + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/resources/test_input.txt ---------------------------------------------------------------------- diff --git a/runners/tez/src/test/resources/test_input.txt b/runners/tez/src/test/resources/test_input.txt new file mode 100644 index 0000000..383be42 --- /dev/null +++ b/runners/tez/src/test/resources/test_input.txt @@ -0,0 +1,2 @@ +This is an example of splitting up lines. +Then formatting those lines into individual tokens. \ No newline at end of file
