kramasamy closed pull request #2826: [Streamlet Scala API] Add Scala Streamlet 
Integration Tests Part I
URL: https://github.com/apache/incubator-heron/pull/2826
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index 927928bbd5..12fbf25f85 100644
--- a/.gitignore
+++ b/.gitignore
@@ -132,3 +132,6 @@ website/public/
 
 # Visual Studio Code
 .vscode
+
+# integration_test
+results/
\ No newline at end of file
diff --git 
a/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java 
b/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java
index e00eb14814..b982453aa8 100644
--- a/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java
+++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java
@@ -11,10 +11,8 @@
 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 //  See the License for the specific language governing permissions and
 //  limitations under the License.
-
 package com.twitter.heron.streamlet.impl;
 
-
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -60,6 +58,10 @@ public BuilderImpl() {
    */
   public TopologyBuilder build() {
     TopologyBuilder builder = new TopologyBuilder();
+    return build(builder);
+  }
+
+  public TopologyBuilder build(TopologyBuilder builder) {
     Set<String> stageNames = new HashSet<>();
     for (StreamletImpl<?> streamlet : sources) {
       streamlet.build(builder, stageNames);
diff --git 
a/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala
 
b/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala
index 7867448989..7bbbdd282b 100644
--- 
a/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala
+++ 
b/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala
@@ -13,8 +13,6 @@
 //  limitations under the License.
 package com.twitter.heron.streamlet.scala
 
-import java.io.Serializable
-
 import com.twitter.heron.streamlet.Context
 
 /**
diff --git 
a/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala 
b/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala
index d6cd4c84a9..4458d70210 100644
--- 
a/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala
+++ 
b/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala
@@ -37,4 +37,7 @@ class BuilderImpl(builder: 
com.twitter.heron.streamlet.Builder)
   def build(): TopologyBuilder =
     builder.asInstanceOf[JavaBuilderImpl].build()
 
+  def build(topologyBuilder: TopologyBuilder): TopologyBuilder =
+    builder.asInstanceOf[JavaBuilderImpl].build(topologyBuilder)
+
 }
diff --git a/integration_test/src/python/test_runner/main.py 
b/integration_test/src/python/test_runner/main.py
index 861a951899..f5d4d1cad1 100644
--- a/integration_test/src/python/test_runner/main.py
+++ b/integration_test/src/python/test_runner/main.py
@@ -293,11 +293,15 @@ def run_tests(conf, args):
 
   http_server_host_port = "%s:%d" % (args.http_server_hostname, 
args.http_server_port)
 
-  if args.tests_bin_path.endswith(".jar"):
+  if args.tests_bin_path.endswith("scala-integration-tests.jar"):
+    test_topologies = filter_test_topologies(conf["scalaTopologies"], 
args.test_topology_pattern)
+    topology_classpath_prefix = conf["topologyClasspathPrefix"]
+    extra_topology_args = "-s http://%s/state"; % http_server_host_port
+  elif args.tests_bin_path.endswith("integration-tests.jar"):
     test_topologies = filter_test_topologies(conf["javaTopologies"], 
args.test_topology_pattern)
     topology_classpath_prefix = conf["topologyClasspathPrefix"]
     extra_topology_args = "-s http://%s/state"; % http_server_host_port
-  elif args.tests_bin_path.endswith(".pex"):
+  elif args.tests_bin_path.endswith("heron_integ_topology.pex"):
     test_topologies = filter_test_topologies(conf["pythonTopologies"], 
args.test_topology_pattern)
     topology_classpath_prefix = ""
     extra_topology_args = ""
diff --git a/integration_test/src/python/test_runner/resources/test.json 
b/integration_test/src/python/test_runner/resources/test.json
index 049c58c80c..ba0addd488 100644
--- a/integration_test/src/python/test_runner/resources/test.json
+++ b/integration_test/src/python/test_runner/resources/test.json
@@ -7,6 +7,13 @@
   "cliConfigPath" : "$HOME/.heron/conf",
   "topologyClasspathPrefix" : "com.twitter.heron.integration_test.topology.",
   "releasePackageUri" : "scheme://role/name/version",
+  "scalaTopologies": [
+    {
+      "topologyName" : "IntegrationTest_ScalaStreamletWithFilterAndTransform",
+      "classPath"    : 
"scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform",
+      "expectedResultRelativePath" : 
"scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json"
+    }
+  ],
   "javaTopologies": [
     {
       "topologyName" : "IntegrationTest_FieldsGrouping",
diff --git a/integration_test/src/scala/BUILD b/integration_test/src/scala/BUILD
new file mode 100644
index 0000000000..48b7d813d0
--- /dev/null
+++ b/integration_test/src/scala/BUILD
@@ -0,0 +1,28 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+filegroup(
+    name = "test-data-files",
+    srcs = glob(["**/*.json"]),
+)
+
+scala_binary(
+    name = "scala-integration-tests-unshaded",
+    srcs = glob(["com/twitter/heron/integration_test/**/*.scala"]),
+    deps = [
+        "//heron/api/src/java:api-java",
+        "//heron/api/src/scala:api-scala",
+        "//integration_test/src/java:common",
+        "//integration_test/src/java:core",
+        "//heron/api/src/java:api-java-low-level"
+    ],
+    main_class = 
"com.twitter.heron.integration_test.topology.scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform"
+)
+
+genrule(
+    name = 'scala-integration-tests',
+    srcs = [":scala-integration-tests-unshaded_deploy.jar"],
+    outs = ["scala-integration-tests.jar"],
+    cmd  = "cp $< $@"
+)
\ No newline at end of file
diff --git 
a/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala
 
b/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala
new file mode 100644
index 0000000000..9563a1f077
--- /dev/null
+++ 
b/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala
@@ -0,0 +1,32 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package com.twitter.heron.integration_test.common
+
+import com.twitter.heron.integration_test.core.TestTopologyBuilder
+import com.twitter.heron.streamlet.scala.Builder
+import com.twitter.heron.streamlet.scala.impl.BuilderImpl
+
+/**
+  * Scala Integration Test Base
+  */
+trait ScalaIntegrationTestBase extends Serializable {
+
+  protected def build(testTopologyBuilder: TestTopologyBuilder,
+                      streamletBuilder: Builder): TestTopologyBuilder = {
+    val streamletBuilderImpl = streamletBuilder.asInstanceOf[BuilderImpl]
+    val topologyBuilder = streamletBuilderImpl.build(testTopologyBuilder)
+    topologyBuilder.asInstanceOf[TestTopologyBuilder]
+  }
+
+}
diff --git 
a/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
 
b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
new file mode 100644
index 0000000000..3408032828
--- /dev/null
+++ 
b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala
@@ -0,0 +1,71 @@
+//  Copyright 2018 Twitter. All rights reserved.
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+package 
com.twitter.heron.integration_test.topology.scala_streamlet_with_filter_and_transform
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.twitter.heron.api.Config
+import com.twitter.heron.integration_test.common.{
+  AbstractTestTopology,
+  ScalaIntegrationTestBase
+}
+import com.twitter.heron.integration_test.core.TestTopologyBuilder
+import com.twitter.heron.streamlet.Context
+import com.twitter.heron.streamlet.scala.{Builder, SerializableTransformer}
+
+object ScalaStreamletWithFilterAndTransform {
+  def main(args: Array[String]): Unit = {
+    val conf = new Config
+    val topology = new ScalaStreamletWithFilterAndTransform(args)
+    topology.submit(conf)
+  }
+}
+
+/**
+  * Scala Streamlet Integration Test
+  */
+@SerialVersionUID(-7280407024398984674L)
+class ScalaStreamletWithFilterAndTransform(args: Array[String])
+    extends AbstractTestTopology(args)
+    with ScalaIntegrationTestBase {
+
+  override protected def buildTopology(
+      testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = {
+    val atomicInteger = new AtomicInteger
+
+    val streamletBuilder = Builder.newBuilder
+
+    streamletBuilder
+      .newSource(() => atomicInteger.getAndIncrement())
+      .setName("incremented-numbers")
+      .filter((i: Int) => i <= 7)
+      .setName("positive-numbers-lower-than-8")
+      .transform[String](new TextTransformer())
+      .setName("numbers-transformed-to-text")
+
+    build(testTopologyBuilder, streamletBuilder)
+  }
+
+}
+
+private class TextTransformer extends SerializableTransformer[Int, String] {
+  private val alphabet = List("a", "b", "c", "d", "e", "f", "g", "h")
+
+  override def setup(context: Context): Unit = {}
+
+  override def transform(i: Int, fun: String => Unit): Unit =
+    fun(s"${alphabet(i)}-$i".toUpperCase)
+
+  override def cleanup(): Unit = {}
+}
diff --git 
a/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json
 
b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json
new file mode 100644
index 0000000000..dda72569e2
--- /dev/null
+++ 
b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json
@@ -0,0 +1 @@
+["A-0", "B-1", "C-2", "D-3", "E-4", "F-5", "G-6", "H-7"]
\ No newline at end of file
diff --git a/scripts/applatix/javatests.sh b/scripts/applatix/javatests.sh
index 0744aaaa09..e19d5df108 100755
--- a/scripts/applatix/javatests.sh
+++ b/scripts/applatix/javatests.sh
@@ -9,14 +9,29 @@ source ${DIR}/testutils.sh
 
 # integration test binaries have to be specified as absolute path
 JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar"
+SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar"
 
-# run the java integration test
-T="heron integration_test java"
+# initialize http-server for integration tests
+T="heron integration_test http-server initialization"
 start_timer "$T"
 ${HOME}/bin/http-server 8080 &
 http_server_id=$!
 trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+end_timer "$T"
+
+# run the scala integration test
+T="heron integration_test scala"
+start_timer "$T"
+${HOME}/bin/test-runner \
+  -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+  -rh localhost -rp 8080\
+  -tp ${HOME}/.herontests/data/scala \
+  -cl local -rl heron-staging -ev devel
+end_timer "$T"
 
+# run the java integration test
+T="heron integration_test java"
+start_timer "$T"
 ${HOME}/bin/test-runner \
   -hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
   -rh localhost -rp 8080\
diff --git a/scripts/applatix/test.sh b/scripts/applatix/test.sh
index c346deaede..8f7391de1f 100755
--- a/scripts/applatix/test.sh
+++ b/scripts/applatix/test.sh
@@ -18,6 +18,7 @@ export PATH=${HOME}/bin:$PATH
 # integration test binaries have to be specified as absolute path
 JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar"
 PYTHON_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/heron_integ_topology.pex"
+SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar"
 
 # install client
 T="heron client install"
@@ -37,13 +38,27 @@ start_timer "$T"
 python ${UTILS}/save-logs.py "heron_tests_install.txt" 
./heron-tests-install.sh --user
 end_timer "$T"
 
-# run the java integration test
-T="heron integration_test java"
+# initialize http-server for integration tests
+T="heron integration_test http-server initialization"
 start_timer "$T"
 ${HOME}/bin/http-server 8080 &
 http_server_id=$!
 trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+end_timer "$T"
+
+# run the scala integration test
+T="heron integration_test scala"
+start_timer "$T"
+${HOME}/bin/test-runner \
+  -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+  -rh localhost -rp 8080\
+  -tp ${HOME}/.herontests/data/scala \
+  -cl local -rl heron-staging -ev devel
+end_timer "$T"
 
+# run the java integration test
+T="heron integration_test java"
+start_timer "$T"
 ${HOME}/bin/test-runner \
   -hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
   -rh localhost -rp 8080\
diff --git a/scripts/packages/BUILD b/scripts/packages/BUILD
index facfb2b1e9..eada6e293e 100644
--- a/scripts/packages/BUILD
+++ b/scripts/packages/BUILD
@@ -343,6 +343,15 @@ pkg_tar(
     ]
 )
 
+pkg_tar(
+    name = "heron-tests-data-scala",
+    package_dir = "data/scala",
+    srcs = [
+        "//integration_test/src/scala:test-data-files",
+    ],
+    strip_prefix = 
'/integration_test/src/scala/com/twitter/heron/integration_test/topology/'
+)
+
 pkg_tar(
     name = "heron-tests-data-java",
     package_dir = "data/java",
@@ -365,6 +374,7 @@ pkg_tar(
     name = "heron-tests-lib",
     package_dir = "lib",
     srcs = [
+       "//integration_test/src/scala:scala-integration-tests",
        "//integration_test/src/java:integration-tests",
        
"//integration_test/src/python/integration_test/topology:heron_integ_topology",
     ],
@@ -376,6 +386,7 @@ pkg_tar(
     srcs = generated_release_files,
     deps = [
         ":heron-tests-bin",
+        ":heron-tests-data-scala",
         ":heron-tests-data-java",
         ":heron-tests-data-python",
         ":heron-tests-lib",
diff --git a/scripts/run_integration_test.sh b/scripts/run_integration_test.sh
index 903f879aff..d70881b9d2 100755
--- a/scripts/run_integration_test.sh
+++ b/scripts/run_integration_test.sh
@@ -8,10 +8,12 @@ 
TEST_RUNNER="./bazel-bin/integration_test/src/python/test_runner/test-runner.pex
 
 
JAVA_TESTS_DIR="integration_test/src/java/com/twitter/heron/integration_test/topology"
 PYTHON_TESTS_DIR="integration_test/src/python/integration_test/topology"
+SCALA_TESTS_DIR="integration_test/src/scala/com/twitter/heron/integration_test/topology"
 
 # integration test binaries have to be specified as absolute path
 
JAVA_INTEGRATION_TESTS_BIN="${PWD}/bazel-genfiles/integration_test/src/java/integration-tests.jar"
 
PYTHON_INTEGRATION_TESTS_BIN="${PWD}/bazel-bin/integration_test/src/python/integration_test/topology/heron_integ_topology.pex"
+SCALA_INTEGRATION_TESTS_BIN="${PWD}/bazel-genfiles/integration_test/src/scala/scala-integration-tests.jar"
 
 CORE_PKG="file://${PWD}/bazel-bin/scripts/packages/heron-core.tar.gz"
 
@@ -28,6 +30,13 @@ ${HTTP_SERVER} 8080 &
 http_server_id=$!
 trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
 
+# run the scala integration tests
+${TEST_RUNNER} \
+  -hc ~/.heron/bin/heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+  -rh localhost -rp 8080 \
+  -tp ${SCALA_TESTS_DIR} \
+  -cl local -rl heron-staging -ev devel -pi ${CORE_PKG}
+
 # run the java integration tests
 ${TEST_RUNNER} \
   -hc ~/.heron/bin/heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
diff --git a/scripts/travis/test.sh b/scripts/travis/test.sh
index 420a56f676..2cece6059d 100755
--- a/scripts/travis/test.sh
+++ b/scripts/travis/test.sh
@@ -15,6 +15,7 @@ echo "Using $PLATFORM platform"
 # integration test binaries have to be specified as absolute path
 JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar"
 PYTHON_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/heron_integ_topology.pex"
+SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar"
 
 # build test related jar
 T="heron build integration_test"
@@ -40,13 +41,27 @@ start_timer "$T"
 python 
./bazel-bin/integration_test/src/python/local_test_runner/local-test-runner
 end_timer "$T"
 
-# run the java integration test
-T="heron integration_test java"
+# initialize http-server for integration tests
+T="heron integration_test http-server initialization"
 start_timer "$T"
 ${HOME}/bin/http-server 8080 &
 http_server_id=$!
 trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+end_timer "$T"
+
+# run the scala integration test
+T="heron integration_test scala"
+start_timer "$T"
+${HOME}/bin/test-runner \
+  -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+  -rh localhost -rp 8080\
+  -tp ${HOME}/.herontests/data/scala \
+  -cl local -rl heron-staging -ev devel
+end_timer "$T"
 
+# run the java integration test
+T="heron integration_test java"
+start_timer "$T"
 ${HOME}/bin/test-runner \
   -hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
   -rh localhost -rp 8080\


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to