Repository: incubator-predictionio Updated Branches: refs/heads/develop 5b02cd1a5 -> 3330013b9
[PIO-61] Add S3 Model Data Repository Closes #371 Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/3330013b Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/3330013b Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/3330013b Branch: refs/heads/develop Commit: 3330013b9d19f2c21e18f9b18e1a48fda3912e68 Parents: 5b02cd1 Author: Shinsuke Sugaya <[email protected]> Authored: Mon Jun 5 14:29:23 2017 -0700 Committer: Donald Szeto <[email protected]> Committed: Mon Jun 5 14:29:23 2017 -0700 ---------------------------------------------------------------------- .travis.yml | 5 + build.sbt | 7 +- conf/pio-env.sh.template | 5 + storage/s3/.gitignore | 1 + storage/s3/build.sbt | 44 ++++++++ .../predictionio/data/storage/s3/S3Models.scala | 101 +++++++++++++++++++ .../data/storage/s3/StorageClient.scala | 45 +++++++++ .../predictionio/data/storage/s3/package.scala | 25 +++++ tests/Dockerfile | 2 + tests/docker-compose.yml | 6 ++ tests/docker-files/awscredentials | 19 ++++ tests/docker-files/env-conf/pio-env.sh | 9 ++ tests/docker-files/init.sh | 4 + tests/run_docker.sh | 4 +- 14 files changed, 274 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 8dcc2fa..c8b7826 100644 --- a/.travis.yml +++ b/.travis.yml @@ -98,6 +98,11 @@ env: PIO_SCALA_VERSION=2.11.8 PIO_SPARK_VERSION=2.1.0 PIO_ELASTICSEARCH_VERSION=1.7.3 + - BUILD_TYPE=Integration + METADATA_REP=ELASTICSEARCH EVENTDATA_REP=ELASTICSEARCH MODELDATA_REP=S3 + PIO_SCALA_VERSION=2.11.8 + PIO_SPARK_VERSION=2.1.0 + PIO_ELASTICSEARCH_VERSION=5.2.2 before_install: - unset SBT_OPTS JVM_OPTS http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/build.sbt ---------------------------------------------------------------------- diff --git a/build.sbt b/build.sbt index e49613f..4287ea9 100644 --- a/build.sbt +++ b/build.sbt @@ -129,6 +129,10 @@ val dataLocalfs = (project in file("storage/localfs")). settings(commonSettings: _*). enablePlugins(GenJavadocPlugin) +val dataS3 = (project in file("storage/s3")). + settings(commonSettings: _*). + enablePlugins(GenJavadocPlugin) + val common = (project in file("common")). settings(commonSettings: _*). enablePlugins(GenJavadocPlugin). @@ -173,7 +177,8 @@ val storageSubprojects = Seq( dataHbase, dataHdfs, dataJdbc, - dataLocalfs) + dataLocalfs, + dataS3) val storage = (project in file("storage")) .aggregate(storageSubprojects map Project.projectToRef: _*) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/conf/pio-env.sh.template ---------------------------------------------------------------------- diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template index c3a1e07..2e55900 100644 --- a/conf/pio-env.sh.template +++ b/conf/pio-env.sh.template @@ -104,3 +104,8 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio # HBase Example # PIO_STORAGE_SOURCES_HBASE_TYPE=hbase # PIO_STORAGE_SOURCES_HBASE_HOME=$PIO_HOME/vendors/hbase-1.0.0 + +# AWS S3 Example +# PIO_STORAGE_SOURCES_S3_TYPE=s3 +# PIO_STORAGE_SOURCES_S3_BUCKET_NAME=pio_bucket +# PIO_STORAGE_SOURCES_S3_BASE_PATH=pio_model http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/storage/s3/.gitignore ---------------------------------------------------------------------- diff --git a/storage/s3/.gitignore b/storage/s3/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/storage/s3/.gitignore @@ -0,0 +1 @@ +/bin/ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/storage/s3/build.sbt ---------------------------------------------------------------------- diff --git a/storage/s3/build.sbt b/storage/s3/build.sbt new file mode 100644 index 0000000..4022209 --- /dev/null +++ b/storage/s3/build.sbt @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import PIOBuild._ + +name := "apache-predictionio-data-s3" + +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided", + "com.google.guava" % "guava" % "14.0.1" % "provided", + "com.amazonaws" % "aws-java-sdk-s3" % "1.11.132", + "org.scalatest" %% "scalatest" % "2.1.7" % "test") + +parallelExecution in Test := false + +pomExtra := childrenPomExtra.value + +assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) + +assemblyShadeRules in assembly := Seq( + ShadeRule.rename("org.apache.http.**" -> "shadeio.data.s3.http.@1").inAll, + ShadeRule.rename("com.fasterxml.**" -> "shadeio.data.s3.fasterxml.@1").inAll +) + +// skip test in assembly +test in assembly := {} + +assemblyOutputPath in assembly := baseDirectory.value.getAbsoluteFile.getParentFile.getParentFile / + "assembly" / "src" / "universal" / "lib" / "spark" / + s"pio-data-s3-assembly-${version.value}.jar" http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/S3Models.scala ---------------------------------------------------------------------- diff --git a/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/S3Models.scala b/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/S3Models.scala new file mode 100644 index 0000000..f907e05 --- /dev/null +++ b/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/S3Models.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.s3 + +import java.io.ByteArrayInputStream + +import org.apache.predictionio.data.storage.Model +import org.apache.predictionio.data.storage.Models +import org.apache.predictionio.data.storage.StorageClientConfig + +import com.amazonaws.services.s3.AmazonS3 +import com.amazonaws.services.s3.model.DeleteObjectRequest +import com.amazonaws.services.s3.model.GetObjectRequest +import com.amazonaws.services.s3.model.ObjectMetadata +import com.amazonaws.services.s3.model.PutObjectRequest +import com.amazonaws.services.s3.model.S3Object +import com.google.common.io.ByteStreams + +import grizzled.slf4j.Logging + +class S3Models(s3Client: AmazonS3, config: StorageClientConfig, prefix: String) + extends Models with Logging { + + def insert(i: Model): Unit = { + def getModel(bucketName: String, key: String): Option[Model] = { + val data = i.models + val metadata: ObjectMetadata = new ObjectMetadata() + metadata.setContentLength(data.length) + val req = new PutObjectRequest(bucketName, key, new ByteArrayInputStream(data), metadata) + try { + s3Client.putObject(req) + } catch { + case e: Throwable => error(s"Failed to insert a model to s3://${bucketName}/${key}", e) + } + None + } + doAction(i.id, getModel) + } + + def get(id: String): Option[Model] = { + def getModel(bucketName: String, key: String): Option[Model] = { + val s3object: S3Object = s3Client.getObject(new GetObjectRequest( + bucketName, key)); + val is = s3object.getObjectContent + try { + Some(Model( + id = id, + models = ByteStreams.toByteArray(is))) + } catch { + case e: Throwable => + error(s"Failed to get a model from s3://${bucketName}/${key}", e) + None + } finally { + is.close() + } + } + doAction(id, getModel) + } + + def delete(id: String): Unit = { + def deleteModel(bucketName: String, key: String): Option[Model] = { + try { + s3Client.deleteObject(new DeleteObjectRequest(bucketName, key)) + } catch { + case e: Throwable => error(s"Failed to delete s3://${bucketName}/${key}", e) + } + None + } + doAction(id, deleteModel) + } + + def doAction(id: String, action: (String, String) => Option[Model]): Option[Model] = { + config.properties.get("BUCKET_NAME") match { + case Some(bucketName) => + val key = config.properties.get("BASE_PATH") match { + case Some(basePath) => s"${basePath}/${prefix}${id}" + case None => s"${prefix}${id}" + } + action(bucketName, key) + case None => + error("S3 bucket is empty.") + None + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/StorageClient.scala ---------------------------------------------------------------------- diff --git a/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/StorageClient.scala b/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/StorageClient.scala new file mode 100644 index 0000000..c37bf3d --- /dev/null +++ b/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/StorageClient.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.s3 + +import org.apache.predictionio.data.storage.BaseStorageClient +import org.apache.predictionio.data.storage.StorageClientConfig + +import com.amazonaws.auth.profile.ProfileCredentialsProvider +import com.amazonaws.client.builder.AwsClientBuilder +import com.amazonaws.services.s3.AmazonS3 +import com.amazonaws.services.s3.AmazonS3ClientBuilder + +import grizzled.slf4j.Logging + +class StorageClient(val config: StorageClientConfig) extends BaseStorageClient + with Logging { + override val prefix = "S3" + val client: AmazonS3 = { + val builder = AmazonS3ClientBuilder.standard().withCredentials(new ProfileCredentialsProvider()) + (config.properties.get("ENDPOINT"), config.properties.get("REGION")) match { + case (Some(endpoint), Some(region)) => + builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) + case (None, Some(region)) => builder.withRegion(region) + } + config.properties.get("DISABLE_CHUNKED_ENCODING") match { + case Some(x) if x.equalsIgnoreCase("true") => builder.disableChunkedEncoding() + } + builder.build() + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/package.scala ---------------------------------------------------------------------- diff --git a/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/package.scala b/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/package.scala new file mode 100644 index 0000000..32845cf --- /dev/null +++ b/storage/s3/src/main/scala/org/apache/predictionio/data/storage/s3/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.data.storage + +/** AWS S3 implementation of storage traits, supporting model data only + * + * @group Implementation + */ +package object s3 {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/tests/Dockerfile ---------------------------------------------------------------------- diff --git a/tests/Dockerfile b/tests/Dockerfile index f7dbd68..45e4bd5 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -42,11 +42,13 @@ COPY docker-files/init.sh init.sh COPY docker-files/env-conf/hbase-site.xml ${PIO_HOME}/conf/hbase-site.xml COPY docker-files/env-conf/pio-env.sh ${PIO_HOME}/conf/pio-env.sh COPY docker-files/pgpass /root/.pgpass +COPY docker-files/awscredentials /root/.aws/credentials RUN chmod 600 /root/.pgpass # Python RUN pip install python-dateutil RUN pip install pytz +RUN pip install awscli # Default repositories setup ENV PIO_STORAGE_REPOSITORIES_METADATA_SOURCE PGSQL http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/tests/docker-compose.yml ---------------------------------------------------------------------- diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index ac1d91d..e0eda34 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -28,12 +28,18 @@ services: POSTGRES_USER: pio POSTGRES_PASSWORD: pio POSTGRES_INITDB_ARGS: --encoding=UTF8 + localstack: + image: atlassianlabs/localstack + environment: + - SERVICES=s3 + - DEBUG=1 pio-testing: image: predictionio/pio-testing:latest depends_on: - elasticsearch - hbase - postgres + - localstack volumes: - ~/.ivy2:/root/.ivy2 - ~/.sbt:/root/.sbt http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/tests/docker-files/awscredentials ---------------------------------------------------------------------- diff --git a/tests/docker-files/awscredentials b/tests/docker-files/awscredentials new file mode 100644 index 0000000..039c36f --- /dev/null +++ b/tests/docker-files/awscredentials @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[default] +aws_access_key_id = foo +aws_secret_access_key = foo http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/tests/docker-files/env-conf/pio-env.sh ---------------------------------------------------------------------- diff --git a/tests/docker-files/env-conf/pio-env.sh b/tests/docker-files/env-conf/pio-env.sh index 72a8f12..276b052 100644 --- a/tests/docker-files/env-conf/pio-env.sh +++ b/tests/docker-files/env-conf/pio-env.sh @@ -111,3 +111,12 @@ PIO_STORAGE_SOURCES_HBASE_TYPE=hbase # HDFS config PIO_STORAGE_SOURCES_HDFS_TYPE=hdfs PIO_STORAGE_SOURCES_HDFS_PATH=/hdfs_models + +# AWS S3 Example +PIO_STORAGE_SOURCES_S3_TYPE=s3 +PIO_STORAGE_SOURCES_S3_ENDPOINT=http://localstack:4572 +PIO_STORAGE_SOURCES_S3_REGION=us-east-1 +PIO_STORAGE_SOURCES_S3_BUCKET_NAME=pio_bucket +PIO_STORAGE_SOURCES_S3_BASE_PATH=pio_model +PIO_STORAGE_SOURCES_S3_DISABLE_CHUNKED_ENCODING=true + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/tests/docker-files/init.sh ---------------------------------------------------------------------- diff --git a/tests/docker-files/init.sh b/tests/docker-files/init.sh index fc12ffe..0cde41d 100755 --- a/tests/docker-files/init.sh +++ b/tests/docker-files/init.sh @@ -20,4 +20,8 @@ set -e export PYTHONPATH=$PIO_HOME/tests:$PYTHONPATH echo "Sleeping $SLEEP_TIME seconds for all services to be ready..." sleep $SLEEP_TIME + +# create S3 bucket in localstack +aws --endpoint-url=http://localstack:4572 --region=us-east-1 s3 mb s3://pio_bucket + eval $@ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3330013b/tests/run_docker.sh ---------------------------------------------------------------------- diff --git a/tests/run_docker.sh b/tests/run_docker.sh index ad7e189..8200785 100755 --- a/tests/run_docker.sh +++ b/tests/run_docker.sh @@ -20,7 +20,7 @@ USAGE=$"Usage: run_docker <meta> <event> <model> <command> Where: meta = [PGSQL,ELASTICSEARCH] event = [PGSQL,HBASE,ELASTICSEARCH] - model = [PGSQL,LOCALFS,HDFS] + model = [PGSQL,LOCALFS,HDFS,S3] command = command to run in the container" if ! [[ "$1" =~ ^(PGSQL|ELASTICSEARCH)$ ]]; then @@ -37,7 +37,7 @@ fi EVENT="$1" shift -if ! [[ "$1" =~ ^(PGSQL|LOCALFS|HDFS)$ ]]; then +if ! [[ "$1" =~ ^(PGSQL|LOCALFS|HDFS|S3)$ ]]; then echo "$USAGE" exit 1 fi
