Repository: beam
Updated Branches:
  refs/heads/tez-runner f10399d7c -> da4687a6e


http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TezDoFnProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TezDoFnProcessorTest.java
 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TezDoFnProcessorTest.java
new file mode 100644
index 0000000..cfd8e41
--- /dev/null
+++ 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TezDoFnProcessorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.tez.client.TezAppMasterStatus;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the TezDoFnProcessor that wraps beam DoFns.
+ */
+public class TezDoFnProcessorTest {
+
+  private static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
+  private static final String INPUT_LOCATION = 
"src/test/resources/test_input.txt";
+  private static final DAG dag = DAG.create("TestDag");
+  private static TezClient client;
+
+  @Before
+  public void setUp(){
+    TezConfiguration config = new TezConfiguration();
+    config.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    config.set("fs.default.name", "file:///");
+    
config.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, 
true);
+    config.set(TezConfiguration.TEZ_TASK_LOG_LEVEL, "DEBUG");
+    client = TezClient.create("TezClient", config);
+  }
+
+  @Test
+  public void testDoFn() throws Exception {
+    String expected = FileUtils.readFileToString(new File(INPUT_LOCATION));
+    Set<String> expectedSet = new 
HashSet<>(Arrays.asList(expected.split(TOKENIZER_PATTERN)));
+
+    DoFn<?,?> doFn = new TestWordsFn();
+    String doFnInstance;
+    doFnInstance = TranslatorUtil.toString(doFn);
+
+    Configuration config = new Configuration();
+    config.set("OUTPUT_TAG", new TupleTag<>().getId());
+    config.set("DO_FN_INSTANCE", doFnInstance);
+    UserPayload payload = TezUtils.createUserPayloadFromConf(config);
+
+    Vertex vertex = Vertex.create("TestVertex", ProcessorDescriptor
+        .create(TezDoFnProcessor.class.getName()).setUserPayload(payload));
+    vertex.addDataSource("TestInput" , MRInput.createConfigBuilder(new 
Configuration(),
+        TextInputFormat.class, INPUT_LOCATION).build());
+
+    dag.addVertex(vertex);
+    client.start();
+    client.submitDAG(dag);
+    while (client.getAppMasterStatus() != TezAppMasterStatus.SHUTDOWN){}
+
+    Assert.assertEquals(expectedSet, TestWordsFn.RESULTS);
+  }
+
+  private static class TestWordsFn extends DoFn<String, String> {
+    private static final Set<String> RESULTS = Collections.synchronizedSet(new 
HashSet<>());
+
+    public TestWordsFn(){
+      RESULTS.clear();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+
+      // Split the line into words.
+      String[] words = c.element().split(TOKENIZER_PATTERN);
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          RESULTS.add(word);
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslationContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslationContextTest.java
 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslationContextTest.java
new file mode 100644
index 0000000..2efca6a
--- /dev/null
+++ 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslationContextTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.tez.TezPipelineOptions;
+import org.apache.beam.runners.tez.TezRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the TranslationContext class
+ */
+public class TranslationContextTest {
+
+  private static final String TEST_SOURCE = "Test.txt";
+  private static final String DAG_NAME = "TestDag";
+  private static final String VERTEX1_NAME = "TestVertex1";
+  private static final String VERTEX2_NAME = "TestVertex2";
+  private static final String PVALUE_NAME = "TestPValue";
+
+  private static PValue value1;
+  private static PValue value2;
+  private static PValue value3;
+  private static TranslationContext context;
+  private static DAG dag;
+
+  @Before
+  public void setUp() {
+    dag = DAG.create(DAG_NAME);
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(TezRunner.class);
+    TezPipelineOptions tezOptions = 
PipelineOptionsValidator.validate(TezPipelineOptions.class, options);
+    context = new TranslationContext(tezOptions, new TezConfiguration());
+    value1 = new PValueBase() {
+      @Override
+      public String getName() {
+        return PVALUE_NAME;
+      }
+    };
+    value2 = new PValueBase() {
+      @Override
+      public String getName() {
+        return PVALUE_NAME;
+      }
+    };
+    value3 = new PValueBase() {
+      @Override
+      public String getName() {
+        return PVALUE_NAME;
+      }
+    };
+  }
+
+  @Test
+  public void testVertexConnect() throws Exception {
+    Vertex vertex1 = Vertex.create(VERTEX1_NAME, 
ProcessorDescriptor.create(TezDoFnProcessor.class.getName()));
+    Vertex vertex2 = Vertex.create(VERTEX2_NAME, 
ProcessorDescriptor.create(TezDoFnProcessor.class.getName()));
+    context.addVertex(VERTEX1_NAME, vertex1, value1, value2);
+    context.addVertex(VERTEX2_NAME, vertex2, value2, value3);
+    context.populateDAG(dag);
+    Vertex vertex1Output = 
Iterables.getOnlyElement(dag.getVertex(VERTEX1_NAME).getOutputVertices());
+    Vertex vertex2Input = 
Iterables.getOnlyElement(dag.getVertex(VERTEX2_NAME).getInputVertices());
+
+    Assert.assertEquals(vertex2, vertex1Output);
+    Assert.assertEquals(vertex1, vertex2Input);
+  }
+
+  @Test
+  public void testDataSourceConnect() throws Exception {
+    Vertex vertex1 = Vertex.create(VERTEX1_NAME, 
ProcessorDescriptor.create(TezDoFnProcessor.class.getName()));
+    context.addVertex(VERTEX1_NAME, vertex1, value1, value2);
+    DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new 
Configuration(context.getConfig()),
+        TextInputFormat.class, 
TEST_SOURCE).groupSplits(true).generateSplitsInAM(true).build();
+    context.addSource(value1, dataSource);
+    context.populateDAG(dag);
+    DataSourceDescriptor vertex1Source = 
Iterables.getOnlyElement(dag.getVertex(VERTEX1_NAME).getDataSources());
+
+    Assert.assertEquals(dataSource, vertex1Source);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslatorUtilTest.java
----------------------------------------------------------------------
diff --git 
a/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslatorUtilTest.java
 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslatorUtilTest.java
new file mode 100644
index 0000000..b2bb5fc
--- /dev/null
+++ 
b/runners/tez/src/test/java/org/apache/beam/runners/tez/translation/TranslatorUtilTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.tez.translation;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the Tez TranslatorUtil class
+ */
+public class TranslatorUtilTest {
+
+  @Test
+  public void testDoFnSerialization() throws Exception {
+    DoFn doFn = new testDoFn();
+    String doFnString = TranslatorUtil.toString(doFn);
+    DoFn newDoFn = (DoFn) TranslatorUtil.fromString(doFnString);
+    Assert.assertEquals(newDoFn.getClass(), doFn.getClass());
+  }
+
+  private static class testDoFn extends DoFn {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      //Test DoFn
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8994f07e/runners/tez/src/test/resources/test_input.txt
----------------------------------------------------------------------
diff --git a/runners/tez/src/test/resources/test_input.txt 
b/runners/tez/src/test/resources/test_input.txt
new file mode 100644
index 0000000..383be42
--- /dev/null
+++ b/runners/tez/src/test/resources/test_input.txt
@@ -0,0 +1,2 @@
+This is an example of splitting up lines.
+Then formatting those lines into individual tokens.
\ No newline at end of file

Reply via email to