Repository: bahir Updated Branches: refs/heads/master 5cfd7ac31 -> c51853d13
[BAHIR-107] Upgrade to Scala 2.12 and Spark 2.4.0 Closes #76 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/c51853d1 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/c51853d1 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/c51853d1 Branch: refs/heads/master Commit: c51853d135ad2d9da67804259f4ed0e29223afb3 Parents: 5cfd7ac Author: Lukasz Antoniak <[email protected]> Authored: Tue Dec 11 06:57:46 2018 -0800 Committer: Luciano Resende <[email protected]> Committed: Mon Jan 28 09:08:09 2019 -0800 ---------------------------------------------------------------------- common/pom.xml | 4 +- dev/change-scala-version.sh | 13 +-- dev/release-build.sh | 97 +++++++++++--------- distribution/pom.xml | 4 +- pom.xml | 76 ++++++++------- sql-cloudant/README.md | 2 +- sql-cloudant/pom.xml | 4 +- .../apache/bahir/cloudant/DefaultSource.scala | 2 +- sql-streaming-akka/README.md | 2 +- sql-streaming-akka/pom.xml | 4 +- .../sql/streaming/akka/AkkaStreamSource.scala | 19 ++-- sql-streaming-mqtt/README.md | 2 +- sql-streaming-mqtt/pom.xml | 4 +- .../sql/streaming/mqtt/CachedMQTTClient.scala | 12 ++- .../sql/streaming/mqtt/MQTTStreamSink.scala | 94 ++++++++----------- .../sql/streaming/mqtt/MQTTStreamSource.scala | 15 +-- .../sql/mqtt/HdfsBasedMQTTStreamSource.scala | 6 +- .../streaming/mqtt/MQTTStreamSinkSuite.scala | 32 +++---- .../streaming/mqtt/MQTTStreamSourceSuite.scala | 9 +- streaming-akka/README.md | 2 +- streaming-akka/pom.xml | 4 +- streaming-mqtt/README.md | 2 +- streaming-mqtt/pom.xml | 4 +- streaming-pubnub/pom.xml | 4 +- streaming-pubsub/pom.xml | 4 +- streaming-twitter/README.md | 2 +- streaming-twitter/pom.xml | 6 +- streaming-zeromq/README.md | 2 +- streaming-zeromq/pom.xml | 4 +- 29 files changed, 218 insertions(+), 217 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index d7757bb..0e443ec 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -20,13 +20,13 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.bahir</groupId> - <artifactId>bahir-parent_2.11</artifactId> + <artifactId>bahir-parent_2.12</artifactId> <version>2.4.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.bahir</groupId> - <artifactId>bahir-common_2.11</artifactId> + <artifactId>bahir-common_2.12</artifactId> <properties> <sbt.project.name>bahir-common</sbt.project.name> </properties> http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/dev/change-scala-version.sh ---------------------------------------------------------------------- diff --git a/dev/change-scala-version.sh b/dev/change-scala-version.sh index 7203ee7..09c97e2 100755 --- a/dev/change-scala-version.sh +++ b/dev/change-scala-version.sh @@ -1,4 +1,5 @@ #!/usr/bin/env bash + # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -18,7 +19,7 @@ set -e -VALID_VERSIONS=( 2.10 2.11 ) +VALID_VERSIONS=( 2.11 2.12 ) usage() { echo "Usage: $(basename $0) [-h|--help] <version> @@ -43,10 +44,10 @@ check_scala_version() { check_scala_version "$TO_VERSION" -if [ $TO_VERSION = "2.11" ]; then - FROM_VERSION="2.10" -else +if [ $TO_VERSION = "2.12" ]; then FROM_VERSION="2.11" +else + FROM_VERSION="2.12" fi sed_i() { @@ -59,7 +60,7 @@ BASEDIR=$(dirname $0)/.. find "$BASEDIR" -name 'pom.xml' -not -path '*target*' -print \ -exec bash -c "sed_i 's/\(artifactId.*\)_'$FROM_VERSION'/\1_'$TO_VERSION'/g' {}" \; -# Also update <scala.binary.version> in parent POM -# Match any scala binary version to ensure idempotency +# also update <scala.binary.version> in parent POM +# match any scala binary version to ensure idempotency sed_i '1,/<scala\.binary\.version>[0-9]*\.[0-9]*</s/<scala\.binary\.version>[0-9]*\.[0-9]*</<scala.binary.version>'$TO_VERSION'</' \ "$BASEDIR/pom.xml" http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/dev/release-build.sh ---------------------------------------------------------------------- diff --git a/dev/release-build.sh b/dev/release-build.sh index a2dc5e2..89f0a52 100755 --- a/dev/release-build.sh +++ b/dev/release-build.sh @@ -38,11 +38,11 @@ to the staging release location. --release-publish --tag="v2.3.0-rc1" Publish the maven artifacts of a release to the Apache staging maven repository. -Note that this will publish both Scala 2.10 and 2.11 artifacts. +Note that this will publish both Scala 2.11 and 2.12 artifacts. --release-snapshot Publish the maven snapshot artifacts to Apache snapshots maven repository -Note that this will publish both Scala 2.10 and 2.11 artifacts. +Note that this will publish both Scala 2.11 and 2.12 artifacts. OPTIONS @@ -78,7 +78,7 @@ if [ $# -eq 0 ]; then fi -# Process each provided argument configuration +# process each provided argument configuration while [ "${1+defined}" ]; do IFS="=" read -ra PARTS <<< "$1" case "${PARTS[0]}" in @@ -134,7 +134,7 @@ while [ "${1+defined}" ]; do echo "Error: Unknown option: $1" >&2 exit 1 ;; - *) # No more options + *) # no more options break ;; esac @@ -160,7 +160,7 @@ fi if [[ "$RELEASE_PUBLISH" == "true" ]]; then if [[ "$GIT_REF" && "$GIT_TAG" ]]; then - echo "ERROR: Only one argumented permitted when publishing : --gitCommitHash or --gitTag" + echo "ERROR: Only one argument permitted when publishing : --gitCommitHash or --gitTag" exit_with_usage fi if [[ -z "$GIT_REF" && -z "$GIT_TAG" ]]; then @@ -175,11 +175,11 @@ if [[ "$RELEASE_PUBLISH" == "true" && "$DRY_RUN" ]]; then fi if [[ "$RELEASE_SNAPSHOT" == "true" && "$DRY_RUN" ]]; then - echo "ERROR: --dryRun not supported for --release-publish" + echo "ERROR: --dryRun not supported for --release-snapshot" exit_with_usage fi -# Commit ref to checkout when building +# commit ref to checkout when building GIT_REF=${GIT_REF:-master} if [[ "$RELEASE_PUBLISH" == "true" && "$GIT_TAG" ]]; then GIT_REF="tags/$GIT_TAG" @@ -200,11 +200,10 @@ fi RELEASE_STAGING_LOCATION="https://dist.apache.org/repos/dist/dev/bahir/bahir-spark" - echo " " -echo "-------------------------------------------------------------" -echo "------- Release preparation with the following parameters ---" -echo "-------------------------------------------------------------" +echo "-----------------------------------------------------------------" +echo "------- Release preparation with the following parameters -------" +echo "-----------------------------------------------------------------" echo "Executing ==> $GOAL" echo "Git reference ==> $GIT_REF" echo "release version ==> $RELEASE_VERSION" @@ -220,7 +219,6 @@ echo $RELEASE_STAGING_LOCATION echo " " function checkout_code { - # Checkout code rm -rf target mkdir target cd target @@ -231,33 +229,41 @@ function checkout_code { git_hash=`git rev-parse --short HEAD` echo "Checked out Bahir git hash $git_hash" - cd "$BASE_DIR" #return to base dir + cd "$BASE_DIR" # return to base dir } if [[ "$RELEASE_PREPARE" == "true" ]]; then echo "Preparing release $RELEASE_VERSION" - # Checkout code + # checkout code checkout_code cd target/bahir - # Build and prepare the release - $MVN $PUBLISH_PROFILES release:clean release:prepare $DRY_RUN -Darguments="-Dgpg.passphrase=\"$GPG_PASSPHRASE\" -DskipTests" -DreleaseVersion="$RELEASE_VERSION" -DdevelopmentVersion="$DEVELOPMENT_VERSION" -Dtag="$RELEASE_TAG" + # test with scala 2.11 and 2.12 + ./dev/change-scala-version.sh 2.11 + $MVN $PUBLISH_PROFILES clean test -Dscala-2.11 || exit 1 + ./dev/change-scala-version.sh 2.12 + $MVN $PUBLISH_PROFILES clean test || exit 1 + + # build and prepare the release + $MVN $PUBLISH_PROFILES release:clean release:prepare $DRY_RUN \ + -DskipTests=true -Dgpg.passphrase="$GPG_PASSPHRASE" \ + -DreleaseVersion="$RELEASE_VERSION" -DdevelopmentVersion="$DEVELOPMENT_VERSION" -Dtag="$RELEASE_TAG" - cd .. #exit bahir + cd .. # exit bahir if [ -z "$DRY_RUN" ]; then cd "$BASE_DIR/target/bahir" git checkout $RELEASE_TAG git clean -d -f -x - $MVN $PUBLISH_PROFILES clean install -DskiptTests -Darguments="-DskipTests" + $MVN $PUBLISH_PROFILES clean install -DskipTests=true cd "$BASE_DIR/target" svn co $RELEASE_STAGING_LOCATION svn-bahir mkdir -p svn-bahir/$RELEASE_VERSION-$RELEASE_RC cp bahir/distribution/target/*.tar.gz svn-bahir/$RELEASE_VERSION-$RELEASE_RC/ - cp bahir/distribution/target/*.zip svn-bahir/$RELEASE_VERSION-$RELEASE_RC/ + cp bahir/distribution/target/*.zip svn-bahir/$RELEASE_VERSION-$RELEASE_RC/ cd svn-bahir/$RELEASE_VERSION-$RELEASE_RC/ rm -f *.asc @@ -265,49 +271,50 @@ if [[ "$RELEASE_PREPARE" == "true" ]]; then rm -f *.sha* for i in *.zip *.tar.gz; do shasum --algorithm 512 $i > $i.sha512; done - cd .. #exit $RELEASE_VERSION-$RELEASE_RC/ + cd .. # exit $RELEASE_VERSION-$RELEASE_RC svn add $RELEASE_VERSION-$RELEASE_RC/ svn ci -m"Apache Bahir $RELEASE_VERSION-$RELEASE_RC" fi - - cd "$BASE_DIR" #exit target - + cd "$BASE_DIR" # exit target exit 0 fi if [[ "$RELEASE_PUBLISH" == "true" ]]; then echo "Preparing release $RELEASE_VERSION" - # Checkout code + # checkout code checkout_code cd target/bahir - #Deploy default scala 2.11 - mvn $PUBLISH_PROFILES -DaltDeploymentRepository=apache.releases.https::default::https://repository.apache.org/service/local/staging/deploy/maven2 clean package gpg:sign install:install deploy:deploy -DskiptTests -Darguments="-DskipTests" -Dgpg.passphrase=$GPG_PASSPHRASE - - #mvn clean + DEPLOYMENT_REPOSITORY="apache.releases.https::default::https://repository.apache.org/service/local/staging/deploy/maven2" - #Deploy scala 2.10 - #./dev/change-scala-version.sh 2.10 - #mvn $PUBLISH_PROFILES -DaltDeploymentRepository=apache.releases.https::default::https://repository.apache.org/service/local/staging/deploy/maven2 clean package gpg:sign install:install deploy:deploy -DskiptTests -Darguments="-DskipTests" -Dscala-2.10 -Dgpg.passphrase=$GPG_PASSPHRASE + # deploy default scala 2.12 + $MVN $PUBLISH_PROFILES clean package gpg:sign install:install deploy:deploy \ + -DaltDeploymentRepository=$DEPLOYMENT_REPOSITORY \ + -DskipTests=true -Dgpg.passphrase=$GPG_PASSPHRASE - cd "$BASE_DIR" #exit target + # deploy scala 2.11 + ./dev/change-scala-version.sh 2.11 + $MVN $PUBLISH_PROFILES clean package gpg:sign install:install deploy:deploy \ + -DaltDeploymentRepository=$DEPLOYMENT_REPOSITORY \ + -DskipTests=true -Dgpg.passphrase=$GPG_PASSPHRASE -Dscala-2.11 + cd "$BASE_DIR" # exit target exit 0 fi if [[ "$RELEASE_SNAPSHOT" == "true" ]]; then - # Checkout code + # checkout code checkout_code cd target/bahir - CURRENT_VERSION=$($MVN help:evaluate -Dexpression=project.version \ - | grep -v INFO | grep -v WARNING | grep -v Download) + DEPLOYMENT_REPOSITORY="apache.snapshots.https::default::https://repository.apache.org/content/repositories/snapshots" + CURRENT_VERSION=$($MVN help:evaluate -Dexpression=project.version | grep -v INFO | grep -v WARNING | grep -v Download) - # Publish Bahir Snapshots to Maven snapshot repo + # publish Bahir snapshots to maven repository echo "Deploying Bahir SNAPSHOT at '$GIT_REF' ($git_hash)" echo "Publish version is $CURRENT_VERSION" if [[ ! $CURRENT_VERSION == *"SNAPSHOT"* ]]; then @@ -316,19 +323,23 @@ if [[ "$RELEASE_SNAPSHOT" == "true" ]]; then exit 1 fi - #Deploy default scala 2.11 - $MVN $PUBLISH_PROFILES -DaltDeploymentRepository=apache.snapshots.https::default::https://repository.apache.org/content/repositories/snapshots clean package gpg:sign install:install deploy:deploy -DskiptTests -Darguments="-DskipTests" -Dgpg.passphrase=$GPG_PASSPHRASE + # deploy default scala 2.12 + $MVN $PUBLISH_PROFILES clean package gpg:sign install:install deploy:deploy \ + -DaltDeploymentRepository=$DEPLOYMENT_REPOSITORY \ + -DskipTests=true -Dgpg.passphrase=$GPG_PASSPHRASE - #Deploy scala 2.10 - ./dev/change-scala-version.sh 2.10 - $MVN $PUBLISH_PROFILES -DaltDeploymentRepository=apache.snapshots.https::default::https://repository.apache.org/content/repositories/snapshots clean package gpg:sign install:install deploy:deploy -DskiptTests -Darguments="-DskipTests" -Dscala-2.10 -Dgpg.passphrase=$GPG_PASSPHRASE + # deploy scala 2.11 + ./dev/change-scala-version.sh 2.11 + $MVN $PUBLISH_PROFILES clean package gpg:sign install:install deploy:deploy \ + -DaltDeploymentRepository=$DEPLOYMENT_REPOSITORY \ + -DskipTests=true -Dgpg.passphrase=$GPG_PASSPHRASE -Dscala-2.11 - cd "$BASE_DIR" #exit target + cd "$BASE_DIR" # exit target exit 0 fi -cd "$BASE_DIR" #return to base dir +cd "$BASE_DIR" # return to base directory rm -rf target echo "ERROR: wrong execution goals" exit_with_usage http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/distribution/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/pom.xml b/distribution/pom.xml index 18ba854..05a311f 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -20,13 +20,13 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.bahir</groupId> - <artifactId>bahir-parent_2.11</artifactId> + <artifactId>bahir-parent_2.12</artifactId> <version>2.4.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.bahir</groupId> - <artifactId>bahir-spark-distribution_2.11</artifactId> + <artifactId>bahir-spark-distribution_2.12</artifactId> <packaging>pom</packaging> <name>Apache Bahir - Spark Extensions Distribution</name> <url>http://bahir.apache.org/</url> http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index af70a76..ec6b353 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ <version>18</version> </parent> <groupId>org.apache.bahir</groupId> - <artifactId>bahir-parent_2.11</artifactId> + <artifactId>bahir-parent_2.12</artifactId> <version>2.4.0-SNAPSHOT</version> <packaging>pom</packaging> <name>Apache Bahir - Parent POM</name> @@ -90,18 +90,19 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <skipTests>false</skipTests> <!-- General project dependencies version --> <java.version>1.8</java.version> <maven.version>3.3.9</maven.version> - <scala.version>2.11.12</scala.version> - <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.12.7</scala.version> + <scala.binary.version>2.12</scala.binary.version> <slf4j.version>1.7.16</slf4j.version> <log4j.version>1.2.17</log4j.version> <!-- Spark version --> - <spark.version>2.3.2</spark.version> + <spark.version>2.4.0</spark.version> <!-- Hadoop version --> <hadoop.version>2.6.5</hadoop.version> @@ -179,7 +180,7 @@ published POMs are flattened and do not contain variables. Without this dependency, some subprojects' published POMs would contain variables like ${scala.binary.version} that will be substituted according to the default properties instead of the ones determined by the - profiles that were active during publishing, causing the Scala 2.10 build's POMs to have 2.11 + profiles that were active during publishing, causing the Scala 2.11 build's POMs to have 2.12 dependencies due to the incorrect substitutions. By ensuring that maven-shade runs for all subprojects, we eliminate this problem because the substitutions are baked into the final POM. @@ -204,6 +205,17 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <!-- + Temporary workaround. Spark Core 2.4.0 with Scala 2.12 depends on Apache Avro 1.8.2, + which pulls in transitive dependency of Paranamer 2.7. All integration tests fail + with error message same as SPARK-22128. Manually upgrading Paranamer to 2.8. + --> + <dependency> + <groupId>com.thoughtworks.paranamer</groupId> + <artifactId>paranamer</artifactId> + <version>2.8</version> + <scope>test</scope> + </dependency> </dependencies> <dependencyManagement> @@ -213,6 +225,11 @@ <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <exclusions> + <!-- Temporary fix, see above. --> + <exclusion> + <groupId>com.thoughtworks.paranamer</groupId> + <artifactId>paranamer</artifactId> + </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-plus</artifactId> @@ -308,7 +325,7 @@ <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-jackson_${scala.binary.version}</artifactId> - <version>3.2.11</version> + <version>3.5.3</version> </dependency> <dependency> @@ -982,6 +999,9 @@ <plugin> <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> + <configuration> + <skipTests>${skipTests}</skipTests> + </configuration> </plugin> <!-- Build test-jar's for all projects, since some projects depend on tests from others --> <plugin> @@ -1040,33 +1060,20 @@ <profile> <id>distribution</id> - <modules> <module>distribution</module> </modules> </profile> - <!-- <profile> - <id>scala-2.10</id> + <id>scala-2.11</id> <activation> - <property><name>scala-2.10</name></property> + <property><name>scala-2.11</name></property> </activation> <properties> - <scala.version>2.10.6</scala.version> - <scala.binary.version>2.10</scala.binary.version> - <jline.version>${scala.version}</jline.version> - <jline.groupid>org.scala-lang</jline.groupid> + <scala.version>2.11.12</scala.version> + <scala.binary.version>2.11</scala.binary.version> </properties> - <dependencyManagement> - <dependencies> - <dependency> - <groupId>${jline.groupid}</groupId> - <artifactId>jline</artifactId> - <version>${jline.version}</version> - </dependency> - </dependencies> - </dependencyManagement> <build> <plugins> <plugin> @@ -1082,7 +1089,7 @@ <rules> <bannedDependencies> <excludes combine.children="append"> - <exclude>*:*_2.11</exclude> + <exclude>*:*_2.12</exclude> </excludes> </bannedDependencies> </rules> @@ -1095,13 +1102,13 @@ </profile> <profile> - <id>scala-2.11</id> + <id>scala-2.12</id> <activation> - <property><name>!scala-2.10</name></property> + <property><name>!scala-2.11</name></property> </activation> <properties> - <scala.version>2.11.8</scala.version> - <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.12.7</scala.version> + <scala.binary.version>2.12</scala.binary.version> </properties> <build> <plugins> @@ -1118,7 +1125,7 @@ <rules> <bannedDependencies> <excludes combine.children="append"> - <exclude>*:*_2.10</exclude> + <exclude>*:*_2.11</exclude> </excludes> </bannedDependencies> </rules> @@ -1129,17 +1136,6 @@ </plugins> </build> </profile> - --> - - <profile> - <id>test-java-home</id> - <activation> - <property><name>env.JAVA_HOME</name></property> - </activation> - <properties> - <test.java.home>${env.JAVA_HOME}</test.java.home> - </properties> - </profile> </profiles> </project> http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-cloudant/README.md ---------------------------------------------------------------------- diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md index b651990..d315d7f 100644 --- a/sql-cloudant/README.md +++ b/sql-cloudant/README.md @@ -37,7 +37,7 @@ Submit a job in Scala: spark-submit --class "<your class>" --master local[4] --packages org.apache.bahir:spark-sql-cloudant__{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}} <path to spark-sql-cloudant jar> -This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards. +This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above. ## Configuration options The configuration is obtained in the following sequence: http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-cloudant/pom.xml ---------------------------------------------------------------------- diff --git a/sql-cloudant/pom.xml b/sql-cloudant/pom.xml index d81232a..a14862b 100644 --- a/sql-cloudant/pom.xml +++ b/sql-cloudant/pom.xml @@ -20,13 +20,13 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.bahir</groupId> - <artifactId>bahir-parent_2.11</artifactId> + <artifactId>bahir-parent_2.12</artifactId> <version>2.4.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.bahir</groupId> - <artifactId>spark-sql-cloudant_2.11</artifactId> + <artifactId>spark-sql-cloudant_2.12</artifactId> <properties> <sbt.project.name>spark-sql-cloudant</sbt.project.name> </properties> http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala index 47643cc..84babdd 100644 --- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala +++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala @@ -76,7 +76,7 @@ case class CloudantReadWriteRelation (config: CloudantConfig, logger.warn("Database " + config.getDbname + ": nothing was saved because the number of records was 0!") } else { - val result = data.toJSON.foreachPartition { x => + data.toJSON.foreachPartition { x: Iterator[String] => val list = x.toList // Has to pass as List, Iterator results in 0 data dataAccess.saveAll(list) } http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-akka/README.md ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/README.md b/sql-streaming-akka/README.md index 29685ee..a29979b 100644 --- a/sql-streaming-akka/README.md +++ b/sql-streaming-akka/README.md @@ -22,7 +22,7 @@ For example, to include it when starting the spark shell: Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. The `--packages` argument can also be used with `bin/spark-submit`. -This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards. +This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above. ## Examples http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-akka/pom.xml ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/pom.xml b/sql-streaming-akka/pom.xml index 98586c7..6d50325 100644 --- a/sql-streaming-akka/pom.xml +++ b/sql-streaming-akka/pom.xml @@ -20,13 +20,13 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.bahir</groupId> - <artifactId>bahir-parent_2.11</artifactId> + <artifactId>bahir-parent_2.12</artifactId> <version>2.4.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.bahir</groupId> - <artifactId>spark-sql-streaming-akka_2.11</artifactId> + <artifactId>spark-sql-streaming-akka_2.12</artifactId> <properties> <sbt.project.name>sql-streaming-akka</sbt.project.name> </properties> http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala index 3f2101c..f20a917 100644 --- a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala +++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala @@ -41,10 +41,12 @@ import com.typesafe.config.ConfigFactory import org.rocksdb.{Options, RocksDB} import org.apache.spark.SparkEnv -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.UTF8StringBuilder import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} -import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} @@ -262,7 +264,7 @@ class AkkaMicroBatchReader(urlOfPublisher: String, override def readSchema(): StructType = AkkaStreamConstants.SCHEMA_DEFAULT - override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = { + override def planInputPartitions(): util.List[InputPartition[InternalRow]] = { assert(startOffset != null && endOffset != null, "start offset and end offset should already be set before create read tasks.") @@ -283,8 +285,9 @@ class AkkaMicroBatchReader(urlOfPublisher: String, (0 until numPartitions).map { i => val slice = slices(i) - new DataReaderFactory[Row] { - override def createDataReader(): DataReader[Row] = new DataReader[Row] { + new InputPartition[InternalRow] { + override def createPartitionReader(): InputPartitionReader[InternalRow] = + new InputPartitionReader[InternalRow] { private var currentIdx = -1 override def next(): Boolean = { @@ -292,8 +295,10 @@ class AkkaMicroBatchReader(urlOfPublisher: String, currentIdx < slice.size } - override def get(): Row = { - Row.fromTuple(slice(currentIdx)) + override def get(): InternalRow = { + val builder = new UTF8StringBuilder() + builder.append(slice(currentIdx)._1) + InternalRow(builder.build(), slice(currentIdx)._2.getTime) } override def close(): Unit = {} http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/README.md ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/README.md b/sql-streaming-mqtt/README.md index 721b544..0fbf63e 100644 --- a/sql-streaming-mqtt/README.md +++ b/sql-streaming-mqtt/README.md @@ -22,7 +22,7 @@ For example, to include it when starting the spark shell: Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. The `--packages` argument can also be used with `bin/spark-submit`. -This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards. +This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above. ## Examples http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/pom.xml b/sql-streaming-mqtt/pom.xml index 3f818f8..53ccc32 100644 --- a/sql-streaming-mqtt/pom.xml +++ b/sql-streaming-mqtt/pom.xml @@ -20,13 +20,13 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.bahir</groupId> - <artifactId>bahir-parent_2.11</artifactId> + <artifactId>bahir-parent_2.12</artifactId> <version>2.4.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.bahir</groupId> - <artifactId>spark-sql-streaming-mqtt_2.11</artifactId> + <artifactId>spark-sql-streaming-mqtt_2.12</artifactId> <properties> <sbt.project.name>sql-streaming-mqtt</sbt.project.name> </properties> http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala index 8925e93..78eae52 100644 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala +++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala @@ -94,8 +94,16 @@ private[mqtt] object CachedMQTTClient extends Logging { private def closeMqttClient(params: Seq[(String, String)], client: MqttClient, persistence: MqttClientPersistence): Unit = { try { - client.disconnect() - persistence.close() + if (client.isConnected) { + client.disconnect() + } + try { + persistence.close() + } catch { + case NonFatal(e) => log.warn( + s"Error while closing MQTT persistent store ${e.getMessage}", e + ) + } client.close() } catch { case NonFatal(e) => log.warn(s"Error while closing MQTT client ${e.getMessage}", e) http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala index 846765c..23385f4 100644 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala +++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala @@ -17,18 +17,17 @@ package org.apache.bahir.sql.streaming.mqtt -import java.nio.charset.Charset - import scala.collection.JavaConverters._ +import scala.collection.mutable import org.eclipse.paho.client.mqttv3.MqttException import org.apache.spark.SparkEnv -import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext} -import org.apache.spark.sql.execution.streaming.sources.{PackedRowCommitMessage, PackedRowWriterFactory} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamWriteSupport} -import org.apache.spark.sql.sources.v2.writer.{DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage} import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -39,67 +38,50 @@ import org.apache.bahir.utils.Retry class MQTTStreamWriter (schema: StructType, parameters: DataSourceOptions) extends StreamWriter with Logging { - private lazy val publishAttempts: Int = - SparkEnv.get.conf.getInt("spark.mqtt.client.publish.attempts", -1) - private lazy val publishBackoff: Long = - SparkEnv.get.conf.getTimeAsMs("spark.mqtt.client.publish.backoff", "5s") + override def createWriterFactory(): DataWriterFactory[InternalRow] = { + // Skipping client identifier as single batch can be distributed to multiple + // Spark worker process. MQTT server does not support two connections + // declaring same client ID at given point in time. + val params = parameters.asMap().asScala.filterNot( + _._1.equalsIgnoreCase("clientId") + ) + MQTTDataWriterFactory(params) + } - assert(SparkSession.getActiveSession.isDefined) - private val spark = SparkSession.getActiveSession.get + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} - private var topic: String = _ - private var qos: Int = -1 + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} +} - initialize() - private def initialize(): Unit = { - val (_, _, topic_, _, _, qos_, _, _, _) = MQTTUtils.parseConfigParams( - collection.immutable.HashMap() ++ parameters.asMap().asScala - ) - topic = topic_ - qos = qos_ - } +case class MQTTDataWriterFactory(config: mutable.Map[String, String]) + extends DataWriterFactory[InternalRow] { + override def createDataWriter( + partitionId: Int, taskId: Long, epochId: Long + ): DataWriter[InternalRow] = new MQTTDataWriter(config) +} - override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory +case object MQTTWriterCommitMessage extends WriterCommitMessage - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { - commit(messages) - } +class MQTTDataWriter(config: mutable.Map[String, String]) extends DataWriter[InternalRow] { + private lazy val publishAttempts: Int = + SparkEnv.get.conf.getInt("spark.mqtt.client.publish.attempts", -1) + private lazy val publishBackoff: Long = + SparkEnv.get.conf.getTimeAsMs("spark.mqtt.client.publish.backoff", "5s") - override def commit(messages: Array[WriterCommitMessage]): Unit = { - val rows = messages.collect { - case PackedRowCommitMessage(rs) => rs - }.flatten + private lazy val (_, _, topic, _, _, qos, _, _, _) = MQTTUtils.parseConfigParams(config.toMap) - // Skipping client identifier as single batch can be distributed to multiple - // Spark worker process. MQTT server does not support two connections - // declaring same client ID at given point in time. - val params_ = Seq() ++ parameters.asMap().asScala.toSeq.filterNot( - _._1.equalsIgnoreCase("clientId") - ) - // IMPL Note: Had to declare new value reference due to serialization requirements. - val topic_ = topic - val qos_ = qos - val publishAttempts_ = publishAttempts - val publishBackoff_ = publishBackoff - - val data = spark.createDataFrame(rows.toList.asJava, schema) - data.foreachPartition ( - iterator => iterator.foreach( - row => { - val client = CachedMQTTClient.getOrCreate(params_.toMap) - val message = row.mkString.getBytes(Charset.defaultCharset()) - Retry(publishAttempts_, publishBackoff_, classOf[MqttException]) { - // In case of errors, retry sending the message. - client.publish(topic_, message, qos_, false) - } - } - ) - ) + override def write(record: InternalRow): Unit = { + val client = CachedMQTTClient.getOrCreate(config.toMap) + val message = record.getBinary(0) + Retry(publishAttempts, publishBackoff, classOf[MqttException]) { + // In case of errors, retry sending the message. + client.publish(topic, message, qos, false) + } } - override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def commit(): WriterCommitMessage = MQTTWriterCommitMessage - override def abort(messages: Array[WriterCommitMessage]): Unit = {} + override def abort(): Unit = {} } case class MQTTRelation(override val sqlContext: SQLContext, data: DataFrame) http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala index 7146ecc..e1314ae 100644 --- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala +++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala @@ -17,6 +17,7 @@ package org.apache.bahir.sql.streaming.mqtt +import java.{util => jutil} import java.nio.charset.Charset import java.sql.Timestamp import java.text.SimpleDateFormat @@ -31,9 +32,10 @@ import scala.collection.mutable.ListBuffer import org.eclipse.paho.client.mqttv3._ import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport} -import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2} import org.apache.spark.sql.types._ @@ -169,7 +171,7 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc MQTTStreamConstants.SCHEMA_DEFAULT } - override def createDataReaderFactories(): java.util.List[DataReaderFactory[Row]] = { + override def planInputPartitions(): jutil.List[InputPartition[InternalRow]] = { val rawList: IndexedSeq[MQTTMessage] = synchronized { val sliceStart = LongOffset.convert(startOffset).get.offset + 1 val sliceEnd = LongOffset.convert(endOffset).get.offset + 1 @@ -186,8 +188,9 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc (0 until numPartitions).map { i => val slice = slices(i) - new DataReaderFactory[Row] { - override def createDataReader(): DataReader[Row] = new DataReader[Row] { + new InputPartition[InternalRow] { + override def createPartitionReader(): InputPartitionReader[InternalRow] = + new InputPartitionReader[InternalRow] { private var currentIdx = -1 override def next(): Boolean = { @@ -195,8 +198,8 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc currentIdx < slice.size } - override def get(): Row = { - Row(slice(currentIdx).id, slice(currentIdx).topic, + override def get(): InternalRow = { + InternalRow(slice(currentIdx).id, slice(currentIdx).topic, slice(currentIdx).payload, slice(currentIdx).timestamp) } http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala index e6e202b..fd39557 100644 --- a/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala +++ b/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.mqtt import java.io.IOException -import java.sql.Timestamp import java.util.Calendar import java.util.concurrent.locks.{Lock, ReentrantLock} @@ -28,10 +27,9 @@ import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.HDFSMetadataLog.FileContextManager import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String @@ -191,7 +189,7 @@ class HdfsBasedMQTTStreamSource( // recover message data file offset from hdfs val dataPath = new Path(receivedDataPath) if (fs.exists(dataPath)) { - val fileManager = new FileContextManager(dataPath, hadoopConfig) + val fileManager = CheckpointFileManager.create(dataPath, hadoopConfig) val dataFileIndexs = fileManager.list(dataPath, new PathFilter { private def isBatchFile(path: Path) = { try { http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala index ecdd942..ab24cb3 100644 --- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala +++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala @@ -22,19 +22,17 @@ import java.net.ConnectException import java.util import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttException import org.scalatest.BeforeAndAfter import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.Future -import org.apache.spark.{SharedSparkContext, SparkEnv, SparkException, SparkFunSuite} -import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.{SharedSparkContext, SparkEnv, SparkFunSuite} +import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext} import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.execution.streaming.sources.PackedRowCommitMessage import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery} -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.bahir.utils.FileHelper @@ -56,7 +54,8 @@ class MQTTStreamSinkSuite(_ssl: Boolean) extends SparkFunSuite } after { - testClient.disconnect() + CachedMQTTClient.clear() + testClient.disconnectForcibly() testClient.close() mqttTestUtils.teardown() FileHelper.deleteFileQuietly(tempDir) @@ -95,15 +94,12 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) { "topic" -> "test", "localStorage" -> tempDir.getAbsoluteFile.toString ) - val schema = StructType(StructField("value", StringType) :: Nil) - val messages : Array[Row] = Array(new GenericRowWithSchema(Array("value1"), schema)) - val thrown: Exception = intercept[SparkException] { + val thrown: Exception = intercept[MqttException] { provider.createStreamWriter( - "query1", schema, OutputMode.Complete(), new DataSourceOptions(parameters.asJava) - ).commit(1, Array(PackedRowCommitMessage(messages))) + "query1", null, OutputMode.Complete(), new DataSourceOptions(parameters.asJava) + ).createWriterFactory().createDataWriter(1, 1, 1).write(null) } - // SparkException -> MqttException -> ConnectException - assert(thrown.getCause.getCause.isInstanceOf[ConnectException]) + assert(thrown.getCause.isInstanceOf[ConnectException]) } test("basic usage") { @@ -111,7 +107,7 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) { val msg2 = "MQTT is a message queue." val (_, dataFrame) = createContextAndDF(msg1, msg2) - sendToMQTT(dataFrame).awaitTermination(3000) + sendToMQTT(dataFrame).awaitTermination(5000) assert(Set(msg1, msg2).equals(messages.values.toSet)) } @@ -120,7 +116,7 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) { val msg = List.tabulate(100)(n => "Hello, World!" + n) val (_, dataFrame) = createContextAndDF(msg: _*) - sendToMQTT(dataFrame).awaitTermination(3000) + sendToMQTT(dataFrame).awaitTermination(5000) assert(Set(msg: _*).equals(messages.values.toSet)) } @@ -134,13 +130,13 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) { intercept[IllegalArgumentException] { provider.createStreamWriter( "query1", null, OutputMode.Complete(), new DataSourceOptions(parameters.asJava) - ) + ).createWriterFactory().createDataWriter(1, 1, 1).write(null) } intercept[IllegalArgumentException] { provider.createStreamWriter( "query1", null, OutputMode.Complete(), new DataSourceOptions(new util.HashMap[String, String]) - ) + ).createWriterFactory().createDataWriter(1, 1, 1).write(null) } } } @@ -163,7 +159,7 @@ class StressTestMQTTSink extends MQTTStreamSinkSuite(false) { val freeMemory: Long = Runtime.getRuntime.freeMemory() log.info(s"Available memory before test run is ${freeMemory / (1024 * 1024)}MB.") val noOfMsgs: Int = 200 - val noOfBatches: Int = 10 + val noOfBatches: Int = 5 val messageBuilder = new StringBuilder() for (i <- 0 until (500 * 1024)) yield messageBuilder.append(((i % 26) + 65).toChar) http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala index c4e340c..39cf0df 100644 --- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala +++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala @@ -33,6 +33,7 @@ import org.scalatest.time.Span import org.apache.spark.{SharedSparkContext, SparkFunSuite} import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQuery} @@ -157,15 +158,15 @@ class BasicMQTTSourceSuite extends MQTTStreamSourceSuite { // Clear in-memory cache to simulate recovery. source.messages.clear() source.setOffsetRange(Optional.empty(), Optional.empty()) - var message: Row = null - for (f <- source.createDataReaderFactories().asScala) { - val dataReader = f.createDataReader() + var message: InternalRow = null + for (f <- source.planInputPartitions().asScala) { + val dataReader = f.createPartitionReader() if (dataReader.next()) { message = dataReader.get() } } source.commit(source.getCurrentOffset) - assert(payload == new String(message.getAs[Array[Byte]](2), "UTF-8")) + assert(payload == new String(message.getBinary(2), "UTF-8")) } test("no server up") { http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-akka/README.md ---------------------------------------------------------------------- diff --git a/streaming-akka/README.md b/streaming-akka/README.md index f57583e..bff9c25 100644 --- a/streaming-akka/README.md +++ b/streaming-akka/README.md @@ -23,7 +23,7 @@ For example, to include it when starting the spark shell: Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. The `--packages` argument can also be used with `bin/spark-submit`. -This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above. +This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above. ## Examples http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-akka/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-akka/pom.xml b/streaming-akka/pom.xml index 5b94c7a..0d4e42f 100644 --- a/streaming-akka/pom.xml +++ b/streaming-akka/pom.xml @@ -20,13 +20,13 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.bahir</groupId> - <artifactId>bahir-parent_2.11</artifactId> + <artifactId>bahir-parent_2.12</artifactId> <version>2.4.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.bahir</groupId> - <artifactId>spark-streaming-akka_2.11</artifactId> + <artifactId>spark-streaming-akka_2.12</artifactId> <properties> <sbt.project.name>streaming-akka</sbt.project.name> </properties> http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-mqtt/README.md ---------------------------------------------------------------------- diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md index 05542de..811f822 100644 --- a/streaming-mqtt/README.md +++ b/streaming-mqtt/README.md @@ -23,7 +23,7 @@ For example, to include it when starting the spark shell: Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. The `--packages` argument can also be used with `bin/spark-submit`. -This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above. +This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above. ## Configuration options. http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-mqtt/pom.xml b/streaming-mqtt/pom.xml index 0f6d809..44ed1e0 100644 --- a/streaming-mqtt/pom.xml +++ b/streaming-mqtt/pom.xml @@ -20,13 +20,13 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.bahir</groupId> - <artifactId>bahir-parent_2.11</artifactId> + <artifactId>bahir-parent_2.12</artifactId> <version>2.4.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.bahir</groupId> - <artifactId>spark-streaming-mqtt_2.11</artifactId> + <artifactId>spark-streaming-mqtt_2.12</artifactId> <properties> <sbt.project.name>streaming-mqtt</sbt.project.name> </properties> http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-pubnub/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-pubnub/pom.xml b/streaming-pubnub/pom.xml index 464cfce..683cb29 100644 --- a/streaming-pubnub/pom.xml +++ b/streaming-pubnub/pom.xml @@ -19,13 +19,13 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> - <artifactId>bahir-parent_2.11</artifactId> + <artifactId>bahir-parent_2.12</artifactId> <groupId>org.apache.bahir</groupId> <version>2.4.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> - <artifactId>spark-streaming-pubnub_2.11</artifactId> + <artifactId>spark-streaming-pubnub_2.12</artifactId> <properties> <sbt.project.name>streaming-pubnub</sbt.project.name> </properties> http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-pubsub/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-pubsub/pom.xml b/streaming-pubsub/pom.xml index f6ecd37..3f92983 100644 --- a/streaming-pubsub/pom.xml +++ b/streaming-pubsub/pom.xml @@ -19,14 +19,14 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> - <artifactId>bahir-parent_2.11</artifactId> + <artifactId>bahir-parent_2.12</artifactId> <groupId>org.apache.bahir</groupId> <version>2.4.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.bahir</groupId> - <artifactId>spark-streaming-pubsub_2.11</artifactId> + <artifactId>spark-streaming-pubsub_2.12</artifactId> <properties> <sbt.project.name>streaming-pubsub</sbt.project.name> </properties> http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-twitter/README.md ---------------------------------------------------------------------- diff --git a/streaming-twitter/README.md b/streaming-twitter/README.md index 4123ea9..1703606 100644 --- a/streaming-twitter/README.md +++ b/streaming-twitter/README.md @@ -23,7 +23,7 @@ For example, to include it when starting the spark shell: Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. The `--packages` argument can also be used with `bin/spark-submit`. -This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above. +This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above. ## Examples http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-twitter/pom.xml b/streaming-twitter/pom.xml index 2bf29b5..f031771 100644 --- a/streaming-twitter/pom.xml +++ b/streaming-twitter/pom.xml @@ -20,13 +20,13 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.bahir</groupId> - <artifactId>bahir-parent_2.11</artifactId> + <artifactId>bahir-parent_2.12</artifactId> <version>2.4.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.bahir</groupId> - <artifactId>spark-streaming-twitter_2.11</artifactId> + <artifactId>spark-streaming-twitter_2.12</artifactId> <properties> <sbt.project.name>streaming-twitter</sbt.project.name> </properties> @@ -72,7 +72,7 @@ <dependency> <groupId>com.twitter</groupId> <artifactId>algebird-core_${scala.binary.version}</artifactId> - <version>0.11.0</version> + <version>0.12.4</version> <scope>test</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-zeromq/README.md ---------------------------------------------------------------------- diff --git a/streaming-zeromq/README.md b/streaming-zeromq/README.md index 8ced539..8d57424 100644 --- a/streaming-zeromq/README.md +++ b/streaming-zeromq/README.md @@ -24,7 +24,7 @@ For example, to include it when starting the spark shell: Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath. The `--packages` argument can also be used with `bin/spark-submit`. -This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above. +This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above. ## Examples http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-zeromq/pom.xml ---------------------------------------------------------------------- diff --git a/streaming-zeromq/pom.xml b/streaming-zeromq/pom.xml index 5aaf2fa..ec1c4a7 100644 --- a/streaming-zeromq/pom.xml +++ b/streaming-zeromq/pom.xml @@ -20,13 +20,13 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.bahir</groupId> - <artifactId>bahir-parent_2.11</artifactId> + <artifactId>bahir-parent_2.12</artifactId> <version>2.4.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.bahir</groupId> - <artifactId>spark-streaming-zeromq_2.11</artifactId> + <artifactId>spark-streaming-zeromq_2.12</artifactId> <properties> <sbt.project.name>streaming-zeromq</sbt.project.name> </properties>
