kramasamy closed pull request #2826: [Streamlet Scala API] Add Scala Streamlet Integration Tests Part I URL: https://github.com/apache/incubator-heron/pull/2826
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/.gitignore b/.gitignore index 927928bbd5..12fbf25f85 100644 --- a/.gitignore +++ b/.gitignore @@ -132,3 +132,6 @@ website/public/ # Visual Studio Code .vscode + +# integration_test +results/ \ No newline at end of file diff --git a/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java b/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java index e00eb14814..b982453aa8 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/impl/BuilderImpl.java @@ -11,10 +11,8 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package com.twitter.heron.streamlet.impl; - import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -60,6 +58,10 @@ public BuilderImpl() { */ public TopologyBuilder build() { TopologyBuilder builder = new TopologyBuilder(); + return build(builder); + } + + public TopologyBuilder build(TopologyBuilder builder) { Set<String> stageNames = new HashSet<>(); for (StreamletImpl<?> streamlet : sources) { streamlet.build(builder, stageNames); diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala b/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala index 7867448989..7bbbdd282b 100644 --- a/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala +++ b/heron/api/src/scala/com/twitter/heron/streamlet/scala/SerializableTransformer.scala @@ -13,8 +13,6 @@ // limitations under the License. package com.twitter.heron.streamlet.scala -import java.io.Serializable - import com.twitter.heron.streamlet.Context /** diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala b/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala index d6cd4c84a9..4458d70210 100644 --- a/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala +++ b/heron/api/src/scala/com/twitter/heron/streamlet/scala/impl/BuilderImpl.scala @@ -37,4 +37,7 @@ class BuilderImpl(builder: com.twitter.heron.streamlet.Builder) def build(): TopologyBuilder = builder.asInstanceOf[JavaBuilderImpl].build() + def build(topologyBuilder: TopologyBuilder): TopologyBuilder = + builder.asInstanceOf[JavaBuilderImpl].build(topologyBuilder) + } diff --git a/integration_test/src/python/test_runner/main.py b/integration_test/src/python/test_runner/main.py index 861a951899..f5d4d1cad1 100644 --- a/integration_test/src/python/test_runner/main.py +++ b/integration_test/src/python/test_runner/main.py @@ -293,11 +293,15 @@ def run_tests(conf, args): http_server_host_port = "%s:%d" % (args.http_server_hostname, args.http_server_port) - if args.tests_bin_path.endswith(".jar"): + if args.tests_bin_path.endswith("scala-integration-tests.jar"): + test_topologies = filter_test_topologies(conf["scalaTopologies"], args.test_topology_pattern) + topology_classpath_prefix = conf["topologyClasspathPrefix"] + extra_topology_args = "-s http://%s/state" % http_server_host_port + elif args.tests_bin_path.endswith("integration-tests.jar"): test_topologies = filter_test_topologies(conf["javaTopologies"], args.test_topology_pattern) topology_classpath_prefix = conf["topologyClasspathPrefix"] extra_topology_args = "-s http://%s/state" % http_server_host_port - elif args.tests_bin_path.endswith(".pex"): + elif args.tests_bin_path.endswith("heron_integ_topology.pex"): test_topologies = filter_test_topologies(conf["pythonTopologies"], args.test_topology_pattern) topology_classpath_prefix = "" extra_topology_args = "" diff --git a/integration_test/src/python/test_runner/resources/test.json b/integration_test/src/python/test_runner/resources/test.json index 049c58c80c..ba0addd488 100644 --- a/integration_test/src/python/test_runner/resources/test.json +++ b/integration_test/src/python/test_runner/resources/test.json @@ -7,6 +7,13 @@ "cliConfigPath" : "$HOME/.heron/conf", "topologyClasspathPrefix" : "com.twitter.heron.integration_test.topology.", "releasePackageUri" : "scheme://role/name/version", + "scalaTopologies": [ + { + "topologyName" : "IntegrationTest_ScalaStreamletWithFilterAndTransform", + "classPath" : "scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform", + "expectedResultRelativePath" : "scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json" + } + ], "javaTopologies": [ { "topologyName" : "IntegrationTest_FieldsGrouping", diff --git a/integration_test/src/scala/BUILD b/integration_test/src/scala/BUILD new file mode 100644 index 0000000000..48b7d813d0 --- /dev/null +++ b/integration_test/src/scala/BUILD @@ -0,0 +1,28 @@ +licenses(["notice"]) + +package(default_visibility = ["//visibility:public"]) + +filegroup( + name = "test-data-files", + srcs = glob(["**/*.json"]), +) + +scala_binary( + name = "scala-integration-tests-unshaded", + srcs = glob(["com/twitter/heron/integration_test/**/*.scala"]), + deps = [ + "//heron/api/src/java:api-java", + "//heron/api/src/scala:api-scala", + "//integration_test/src/java:common", + "//integration_test/src/java:core", + "//heron/api/src/java:api-java-low-level" + ], + main_class = "com.twitter.heron.integration_test.topology.scala_streamlet_with_filter_and_transform.ScalaStreamletWithFilterAndTransform" +) + +genrule( + name = 'scala-integration-tests', + srcs = [":scala-integration-tests-unshaded_deploy.jar"], + outs = ["scala-integration-tests.jar"], + cmd = "cp $< $@" +) \ No newline at end of file diff --git a/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala b/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala new file mode 100644 index 0000000000..9563a1f077 --- /dev/null +++ b/integration_test/src/scala/com/twitter/heron/integration_test/common/ScalaIntegrationTestBase.scala @@ -0,0 +1,32 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.integration_test.common + +import com.twitter.heron.integration_test.core.TestTopologyBuilder +import com.twitter.heron.streamlet.scala.Builder +import com.twitter.heron.streamlet.scala.impl.BuilderImpl + +/** + * Scala Integration Test Base + */ +trait ScalaIntegrationTestBase extends Serializable { + + protected def build(testTopologyBuilder: TestTopologyBuilder, + streamletBuilder: Builder): TestTopologyBuilder = { + val streamletBuilderImpl = streamletBuilder.asInstanceOf[BuilderImpl] + val topologyBuilder = streamletBuilderImpl.build(testTopologyBuilder) + topologyBuilder.asInstanceOf[TestTopologyBuilder] + } + +} diff --git a/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala new file mode 100644 index 0000000000..3408032828 --- /dev/null +++ b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransform.scala @@ -0,0 +1,71 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.integration_test.topology.scala_streamlet_with_filter_and_transform + +import java.util.concurrent.atomic.AtomicInteger + +import com.twitter.heron.api.Config +import com.twitter.heron.integration_test.common.{ + AbstractTestTopology, + ScalaIntegrationTestBase +} +import com.twitter.heron.integration_test.core.TestTopologyBuilder +import com.twitter.heron.streamlet.Context +import com.twitter.heron.streamlet.scala.{Builder, SerializableTransformer} + +object ScalaStreamletWithFilterAndTransform { + def main(args: Array[String]): Unit = { + val conf = new Config + val topology = new ScalaStreamletWithFilterAndTransform(args) + topology.submit(conf) + } +} + +/** + * Scala Streamlet Integration Test + */ +@SerialVersionUID(-7280407024398984674L) +class ScalaStreamletWithFilterAndTransform(args: Array[String]) + extends AbstractTestTopology(args) + with ScalaIntegrationTestBase { + + override protected def buildTopology( + testTopologyBuilder: TestTopologyBuilder): TestTopologyBuilder = { + val atomicInteger = new AtomicInteger + + val streamletBuilder = Builder.newBuilder + + streamletBuilder + .newSource(() => atomicInteger.getAndIncrement()) + .setName("incremented-numbers") + .filter((i: Int) => i <= 7) + .setName("positive-numbers-lower-than-8") + .transform[String](new TextTransformer()) + .setName("numbers-transformed-to-text") + + build(testTopologyBuilder, streamletBuilder) + } + +} + +private class TextTransformer extends SerializableTransformer[Int, String] { + private val alphabet = List("a", "b", "c", "d", "e", "f", "g", "h") + + override def setup(context: Context): Unit = {} + + override def transform(i: Int, fun: String => Unit): Unit = + fun(s"${alphabet(i)}-$i".toUpperCase) + + override def cleanup(): Unit = {} +} diff --git a/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json new file mode 100644 index 0000000000..dda72569e2 --- /dev/null +++ b/integration_test/src/scala/com/twitter/heron/integration_test/topology/scala_streamlet_with_filter_and_transform/ScalaStreamletWithFilterAndTransformResults.json @@ -0,0 +1 @@ +["A-0", "B-1", "C-2", "D-3", "E-4", "F-5", "G-6", "H-7"] \ No newline at end of file diff --git a/scripts/applatix/javatests.sh b/scripts/applatix/javatests.sh index 0744aaaa09..e19d5df108 100755 --- a/scripts/applatix/javatests.sh +++ b/scripts/applatix/javatests.sh @@ -9,14 +9,29 @@ source ${DIR}/testutils.sh # integration test binaries have to be specified as absolute path JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar" +SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar" -# run the java integration test -T="heron integration_test java" +# initialize http-server for integration tests +T="heron integration_test http-server initialization" start_timer "$T" ${HOME}/bin/http-server 8080 & http_server_id=$! trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT +end_timer "$T" + +# run the scala integration test +T="heron integration_test scala" +start_timer "$T" +${HOME}/bin/test-runner \ + -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \ + -rh localhost -rp 8080\ + -tp ${HOME}/.herontests/data/scala \ + -cl local -rl heron-staging -ev devel +end_timer "$T" +# run the java integration test +T="heron integration_test java" +start_timer "$T" ${HOME}/bin/test-runner \ -hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \ -rh localhost -rp 8080\ diff --git a/scripts/applatix/test.sh b/scripts/applatix/test.sh index c346deaede..8f7391de1f 100755 --- a/scripts/applatix/test.sh +++ b/scripts/applatix/test.sh @@ -18,6 +18,7 @@ export PATH=${HOME}/bin:$PATH # integration test binaries have to be specified as absolute path JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar" PYTHON_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/heron_integ_topology.pex" +SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar" # install client T="heron client install" @@ -37,13 +38,27 @@ start_timer "$T" python ${UTILS}/save-logs.py "heron_tests_install.txt" ./heron-tests-install.sh --user end_timer "$T" -# run the java integration test -T="heron integration_test java" +# initialize http-server for integration tests +T="heron integration_test http-server initialization" start_timer "$T" ${HOME}/bin/http-server 8080 & http_server_id=$! trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT +end_timer "$T" + +# run the scala integration test +T="heron integration_test scala" +start_timer "$T" +${HOME}/bin/test-runner \ + -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \ + -rh localhost -rp 8080\ + -tp ${HOME}/.herontests/data/scala \ + -cl local -rl heron-staging -ev devel +end_timer "$T" +# run the java integration test +T="heron integration_test java" +start_timer "$T" ${HOME}/bin/test-runner \ -hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \ -rh localhost -rp 8080\ diff --git a/scripts/packages/BUILD b/scripts/packages/BUILD index facfb2b1e9..eada6e293e 100644 --- a/scripts/packages/BUILD +++ b/scripts/packages/BUILD @@ -343,6 +343,15 @@ pkg_tar( ] ) +pkg_tar( + name = "heron-tests-data-scala", + package_dir = "data/scala", + srcs = [ + "//integration_test/src/scala:test-data-files", + ], + strip_prefix = '/integration_test/src/scala/com/twitter/heron/integration_test/topology/' +) + pkg_tar( name = "heron-tests-data-java", package_dir = "data/java", @@ -365,6 +374,7 @@ pkg_tar( name = "heron-tests-lib", package_dir = "lib", srcs = [ + "//integration_test/src/scala:scala-integration-tests", "//integration_test/src/java:integration-tests", "//integration_test/src/python/integration_test/topology:heron_integ_topology", ], @@ -376,6 +386,7 @@ pkg_tar( srcs = generated_release_files, deps = [ ":heron-tests-bin", + ":heron-tests-data-scala", ":heron-tests-data-java", ":heron-tests-data-python", ":heron-tests-lib", diff --git a/scripts/run_integration_test.sh b/scripts/run_integration_test.sh index 903f879aff..d70881b9d2 100755 --- a/scripts/run_integration_test.sh +++ b/scripts/run_integration_test.sh @@ -8,10 +8,12 @@ TEST_RUNNER="./bazel-bin/integration_test/src/python/test_runner/test-runner.pex JAVA_TESTS_DIR="integration_test/src/java/com/twitter/heron/integration_test/topology" PYTHON_TESTS_DIR="integration_test/src/python/integration_test/topology" +SCALA_TESTS_DIR="integration_test/src/scala/com/twitter/heron/integration_test/topology" # integration test binaries have to be specified as absolute path JAVA_INTEGRATION_TESTS_BIN="${PWD}/bazel-genfiles/integration_test/src/java/integration-tests.jar" PYTHON_INTEGRATION_TESTS_BIN="${PWD}/bazel-bin/integration_test/src/python/integration_test/topology/heron_integ_topology.pex" +SCALA_INTEGRATION_TESTS_BIN="${PWD}/bazel-genfiles/integration_test/src/scala/scala-integration-tests.jar" CORE_PKG="file://${PWD}/bazel-bin/scripts/packages/heron-core.tar.gz" @@ -28,6 +30,13 @@ ${HTTP_SERVER} 8080 & http_server_id=$! trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT +# run the scala integration tests +${TEST_RUNNER} \ + -hc ~/.heron/bin/heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \ + -rh localhost -rp 8080 \ + -tp ${SCALA_TESTS_DIR} \ + -cl local -rl heron-staging -ev devel -pi ${CORE_PKG} + # run the java integration tests ${TEST_RUNNER} \ -hc ~/.heron/bin/heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \ diff --git a/scripts/travis/test.sh b/scripts/travis/test.sh index 420a56f676..2cece6059d 100755 --- a/scripts/travis/test.sh +++ b/scripts/travis/test.sh @@ -15,6 +15,7 @@ echo "Using $PLATFORM platform" # integration test binaries have to be specified as absolute path JAVA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/integration-tests.jar" PYTHON_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/heron_integ_topology.pex" +SCALA_INTEGRATION_TESTS_BIN="${HOME}/.herontests/lib/scala-integration-tests.jar" # build test related jar T="heron build integration_test" @@ -40,13 +41,27 @@ start_timer "$T" python ./bazel-bin/integration_test/src/python/local_test_runner/local-test-runner end_timer "$T" -# run the java integration test -T="heron integration_test java" +# initialize http-server for integration tests +T="heron integration_test http-server initialization" start_timer "$T" ${HOME}/bin/http-server 8080 & http_server_id=$! trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT +end_timer "$T" + +# run the scala integration test +T="heron integration_test scala" +start_timer "$T" +${HOME}/bin/test-runner \ + -hc heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \ + -rh localhost -rp 8080\ + -tp ${HOME}/.herontests/data/scala \ + -cl local -rl heron-staging -ev devel +end_timer "$T" +# run the java integration test +T="heron integration_test java" +start_timer "$T" ${HOME}/bin/test-runner \ -hc heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \ -rh localhost -rp 8080\ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services