This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 92abdf636c5 [Playground] Resolve issue with SCIO examples failing on
start due to a timeout (#24946)
92abdf636c5 is described below
commit 92abdf636c52cce7c067593000bc846f507b0595
Author: Timur Sultanov <[email protected]>
AuthorDate: Thu Mar 23 05:37:53 2023 +0400
[Playground] Resolve issue with SCIO examples failing on start due to a
timeout (#24946)
* Initialize SBT cache during SCIO playground container build
Run sbt tool during container build to let it download Scala dependencies
form Maven during build time instead of having to wait for downloading all
dependencies during first run of examples in container
* Fix issue with reading of GRPC_TIMEOUT environment variables in CI/CD
scripts
* Fix cleanup of execution environment for Scala examples
* Fix panic in preparers when an empty file is passed
* Use better name for SCIO project directory
* Run "sbt compile" during container build to fetch all Scala dependencies
* Disable forking JVM in SBT to significantly reduce memory usage
* Impose memory limits on local deployments of SCIO runner container to
better imitate real deployments
* Fine-tune Java GC to improve performance and memory usage of SCIO examples
* Remove large blobs of text from common_test.go
* Add `sbt` to the list of development dependencies
* Clarify running of backend tests in Playground
* Clarify local running of backend
* Improve consistency in code blocks in backend Readme
* Fixing trailing whitespace
* Update playground/backend/README.md
Co-authored-by: Danny McCormick <[email protected]>
* Update playground/backend/internal/utils/preparers_utils_test.go
Co-authored-by: Danny McCormick <[email protected]>
---------
Co-authored-by: Danny McCormick <[email protected]>
---
playground/README.md | 35 ++++++-
playground/backend/README.md | 61 +++++++++--
playground/backend/containers/scio/Dockerfile | 17 +++-
.../internal/code_processing/code_processing.go | 8 +-
playground/backend/internal/fs_tool/fs.go | 36 +------
playground/backend/internal/fs_tool/fs_test.go | 6 +-
.../setup_tools/life_cycle/life_cycle_setuper.go | 49 +++++----
.../life_cycle/life_cycle_setuper_test.go | 8 +-
playground/backend/internal/utils/common.go | 51 ++++++++--
playground/backend/internal/utils/common_test.go | 87 ----------------
.../backend/internal/utils/preparers_utils.go | 7 +-
.../backend/internal/utils/preparers_utils_test.go | 61 +++++++++++
.../internal/utils/test_data/JavaFileName.java | 23 +++++
.../backend/internal/utils/test_data/wordcount.py | 111 +++++++++++++++++++++
playground/backend/new_scio_project.sh | 4 +-
playground/docker-compose.local.yaml | 4 +
16 files changed, 384 insertions(+), 184 deletions(-)
diff --git a/playground/README.md b/playground/README.md
index 958e58ce1d7..2a4c777f03c 100644
--- a/playground/README.md
+++ b/playground/README.md
@@ -35,11 +35,44 @@ The following requirements are needed for development,
testing, and deploying.
- [Docker Compose](https://docs.docker.com/compose/install/)
- [gcloud CLI](https://cloud.google.com/sdk/docs/install)
- [gcloud Beta
Commands](https://cloud.google.com/sdk/gcloud/reference/components/install)
-- [Cloud Datastore
Emulator](https://cloud.google.com/sdk/gcloud/reference/components/install)
+- [Cloud Datastore
Emulator](https://cloud.google.com/datastore/docs/tools/datastore-emulator)
+- [sbt](https://www.scala-sbt.org/)
+
+### Google Cloud Shell Prerequisites Installation
+Google Cloud Shell already has most of the prerequisites installed. Only few
tools need to be installed separately
+
+#### Flutter
+```shell
+git config --global --add safe.directory /google/flutter
+flutter doctor
+```
+
+#### Protobuf
+```shell
+go install google.golang.org/protobuf/cmd/[email protected]
+go install google.golang.org/grpc/cmd/[email protected]
+dart pub global activate protoc_plugin
+npm install -g @bufbuild/buf
+```
+#### sbt
+```shell
+echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee
/etc/apt/sources.list.d/sbt.list
+echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee
/etc/apt/sources.list.d/sbt_old.list
+curl -sL
"https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823"
| sudo -H gpg --no-default-keyring --keyring
gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import
+sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg
+sudo apt-get update
+sudo apt-get install sbt
+```
+### Additional tools
+Google Cloud shell machines do not have `netcat` and `lsof` preinstalled.
Install them using:
+```shell
+sudo apt install netcat lsof
+```
# Available Gradle Tasks
## Perform overall pre-commit checks
+> **Google Cloud Shell note:** run `unset GOOGLE_CLOUD_PROJECT` before running
tests so they would use locally running datastore emulator.
```
cd beam
diff --git a/playground/backend/README.md b/playground/backend/README.md
index 1b3ad66f2b4..6659e948d75 100644
--- a/playground/backend/README.md
+++ b/playground/backend/README.md
@@ -27,7 +27,7 @@ no setup.
## Getting Started
-See [playground/README.md](../README.md) for details on requirements and setup.
+See [playground/README.md](../README.md) for details on installing development
dependencies.
This section describes what is needed to run the backend application.
@@ -35,35 +35,76 @@ This section describes what is needed to run the backend
application.
- Set up environment variables to run the backend locally
- Running the backend via Docker
-### Go commands to run/test application locally
+## Go commands to run/test application locally
+### Prerequisite
+
+> **Google Cloud Shell note:** `start_datastore_emulator.sh` script makes use
of `nc` and `lsof` commands which are not installed on Google Cloud Shell
machines. You can install them using `sudo apt install netcat lsof`.
+
+> **Google Cloud Shell note:** run `unset GOOGLE_CLOUD_PROJECT` before running
tests so they would use locally running datastore emulator.
+
+Start datastore emulator
+```shell
+bash start_datastore_emulator.sh
+```
+
+After you have finished running tests
+```shell
+bash stop_datastore_emulator.sh
+```
+
+### Run/build
Go to the backend directory:
```shell
-$ cd backend
+cd backend
```
-The following command is used to build and serve the backend locally:
+To run backend server on development machine without using docker you'll need
first to prepare a working directory anywhere outside of Beam source tree:
+```shell
+mkdir ~/path/to/workdir
+```
+and then copy `datasets/` and `configs/` and `logging.properties` from
[`playground/backend/`](/playground/backend/) directory:
+```shell
+cp -r {logging.properties,datasets/,configs/} ~/path/to/workdir
+```
+In case if you want to start backend for Go SDK you additionally will also
need to create a prepared mod dir and export an additional environment variable:
```shell
-$ go run ./cmd/server/server.go
+export PREPARED_MOD_DIR=~/path/to/workdir/prepared_folder
+SDK_TAG=2.44.0 bash ./containers/go/setup_sdk.sh $PREPARED_MOD_DIR
```
+The following command will build and serve the backend locally:
+
+```shell
+SERVER_PORT=<port> \
+BEAM_SDK=<beam_sdk_type> \
+APP_WORK_DIR=<path_to_workdir> \
+DATASTORE_EMULATOR_HOST=127.0.0.1:8888 \
+DATASTORE_PROJECT_ID=test \
+SDK_CONFIG=../sdks-emulator.yaml \
+go run ./cmd/server
+```
+
+where `<port>` should be the value of port on which you want to have the
backend server available; `<beam_sdk_type>` is a value of desired Beam SDK,
possible values are `SDK_UNSPECIFIED`, `SDK_JAVA`, `SDK_PYTHON`, `SDK_GO`,
`SDK_SCIO`; `<path_to_workdir>` should be set to path to your work dir, e.g.
`~/path/to/workdir`.
+
Run the following command to generate a release build file:
```shell
-$ go build ./cmd/server/server.go
+go build ./cmd/server/server.go
```
+### Test
Playground tests may be run using this command:
```shell
-$ go test ... -v
+go test ./... -v
```
The full list of commands can be found [here](https://pkg.go.dev/cmd/go).
-### Set up environment variables to run the backend locally
+## Set up environment variables to run the backend locally
These environment variables should be set to run the backend locally:
@@ -96,7 +137,7 @@ default value and there is no need to set them up to launch
locally:
- `PROPERTY_PATH` - is the application properties path (default value = `.`)
- `CACHE_REQUEST_TIMEOUT` - is the timeout to request data from cache (default
value = `5 sec`)
-### Application properties
+## Application properties
These properties are stored in `backend/properties.yaml` file:
@@ -106,7 +147,7 @@ These properties are stored in `backend/properties.yaml`
file:
- `removing_unused_snippets_cron` - is the cron expression for the scheduled
task to remove unused snippets.
- `removing_unused_snippets_days` - is the number of days after which a
snippet becomes unused.
-### Running the server app via Docker
+## Running the server app via Docker
To run the server using Docker images there are `Docker` files in the
`containers` folder for Java, Python and Go
languages. Each of them processes the corresponding SDK, so the backend with
Go SDK will work with Go
diff --git a/playground/backend/containers/scio/Dockerfile
b/playground/backend/containers/scio/Dockerfile
index 43feb4ad774..73d187ea964 100644
--- a/playground/backend/containers/scio/Dockerfile
+++ b/playground/backend/containers/scio/Dockerfile
@@ -48,6 +48,7 @@ COPY --from=build /go/src/playground/backend/configs
/opt/playground/backend/con
COPY --from=build /go/src/playground/backend/logging.properties
/opt/playground/backend/
COPY --from=build /go/src/playground/backend/new_scio_project.sh
/opt/playground/backend/
COPY --from=build
/go/src/playground/backend/internal/fs_tool/ExampleData.scala
/opt/playground/backend/
+RUN chmod +x /opt/playground/backend/new_scio_project.sh
# Install sbt
RUN echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | tee
/etc/apt/sources.list.d/sbt.list &&\
@@ -64,8 +65,6 @@ RUN mkdir /opt/mitmproxy &&\
mkdir /usr/local/share/ca-certificates/extra
COPY allow_list_proxy.py /opt/mitmproxy/
COPY allow_list.py /opt/mitmproxy/
-ENV HTTP_PROXY="http://127.0.0.1:8081"
-ENV HTTPS_PROXY="http://127.0.0.1:8081"
COPY src/properties.yaml /opt/playground/backend/properties.yaml
COPY entrypoint.sh /
@@ -90,4 +89,18 @@ RUN chown -R appuser:appgroup
/opt/playground/backend/executable_files/ \
# Switch to appuser
USER appuser
+# Let sbt download files from Maven
+RUN mkdir -p /tmp/sbt-initialize
+WORKDIR /tmp/sbt-initialize
+RUN /opt/playground/backend/new_scio_project.sh
+WORKDIR /tmp/sbt-initialize/scio
+RUN sbt "+compile"
+WORKDIR /
+RUN rm -r /tmp/sbt-initialize
+
+# Enable mitmproxy
+ENV HTTP_PROXY="http://127.0.0.1:8081"
+ENV HTTPS_PROXY="http://127.0.0.1:8081"
+ENV SBT_OPTS="-Xmx512M -XX:+UseG1GC -XX:+UseStringDeduplication"
+
ENTRYPOINT ["/entrypoint.sh"]
diff --git a/playground/backend/internal/code_processing/code_processing.go
b/playground/backend/internal/code_processing/code_processing.go
index b2e432c4f99..23e34240eaa 100644
--- a/playground/backend/internal/code_processing/code_processing.go
+++ b/playground/backend/internal/code_processing/code_processing.go
@@ -424,21 +424,21 @@ func readGraphFile(pipelineLifeCycleCtx, backgroundCtx
context.Context, cacheSer
case <-ticker.C:
if _, err := os.Stat(graphFilePath); err == nil {
ticker.Stop()
- graph, err := utils.ReadFile(pipelineId,
graphFilePath)
+ graph, err := os.ReadFile(graphFilePath)
if err != nil {
logger.Errorf("%s: Error during saving
graph to the file: %s", pipelineId, err.Error())
}
- _ = utils.SetToCache(backgroundCtx,
cacheService, pipelineId, cache.Graph, graph)
+ _ = utils.SetToCache(backgroundCtx,
cacheService, pipelineId, cache.Graph, string(graph))
}
// in case of timeout or cancel
case <-pipelineLifeCycleCtx.Done():
ticker.Stop()
if _, err := os.Stat(graphFilePath); err == nil {
- graph, err := utils.ReadFile(pipelineId,
graphFilePath)
+ graph, err := os.ReadFile(graphFilePath)
if err != nil {
logger.Errorf("%s: Error during saving
graph to the file: %s", pipelineId, err.Error())
}
- _ = utils.SetToCache(backgroundCtx,
cacheService, pipelineId, cache.Graph, graph)
+ _ = utils.SetToCache(backgroundCtx,
cacheService, pipelineId, cache.Graph, string(graph))
}
return
}
diff --git a/playground/backend/internal/fs_tool/fs.go
b/playground/backend/internal/fs_tool/fs.go
index d344a8afddb..f132e71c21d 100644
--- a/playground/backend/internal/fs_tool/fs.go
+++ b/playground/backend/internal/fs_tool/fs.go
@@ -18,12 +18,9 @@ package fs_tool
import (
"beam.apache.org/playground/backend/internal/logger"
"fmt"
- "io"
+ "github.com/google/uuid"
"io/fs"
"os"
- "path/filepath"
-
- "github.com/google/uuid"
pb "beam.apache.org/playground/backend/internal/api/v1"
"beam.apache.org/playground/backend/internal/db/entity"
@@ -114,37 +111,6 @@ func (lc *LifeCycle) CreateSourceCodeFiles(sources
[]entity.FileEntity) error {
return nil
}
-// CopyFile copies a file with fileName from sourceDir to destinationDir.
-func (lc *LifeCycle) CopyFile(fileName, sourceDir, destinationDir string)
error {
- absSourcePath := filepath.Join(sourceDir, fileName)
- absDestinationPath := filepath.Join(destinationDir, fileName)
- sourceFileStat, err := os.Stat(absSourcePath)
- if err != nil {
- return err
- }
-
- if !sourceFileStat.Mode().IsRegular() {
- return fmt.Errorf("%s is not a regular file", fileName)
- }
-
- sourceFile, err := os.Open(absSourcePath)
- if err != nil {
- return err
- }
- defer sourceFile.Close()
-
- destinationFile, err := os.Create(absDestinationPath)
- if err != nil {
- return err
- }
- defer destinationFile.Close()
- _, err = io.Copy(destinationFile, sourceFile)
- if err != nil {
- return err
- }
- return nil
-}
-
func (lc *LifeCycle) GetPreparerParameters() map[string]string {
if lc.emulatorMockCluster == nil {
return map[string]string{}
diff --git a/playground/backend/internal/fs_tool/fs_test.go
b/playground/backend/internal/fs_tool/fs_test.go
index 7e4cc36221f..4241fd9a486 100644
--- a/playground/backend/internal/fs_tool/fs_test.go
+++ b/playground/backend/internal/fs_tool/fs_test.go
@@ -135,11 +135,7 @@ func TestLifeCycle_CopyFile(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- l := &LifeCycle{
- folderGlobs: tt.fields.folderGlobs,
- Paths: tt.fields.Paths,
- }
- if err := l.CopyFile(tt.args.fileName,
tt.args.sourceDir, tt.args.destinationDir); (err != nil) != tt.wantErr {
+ if err :=
utils.CopyFilePreservingName(tt.args.fileName, tt.args.sourceDir,
tt.args.destinationDir); (err != nil) != tt.wantErr {
t.Errorf("CopyFile() error = %v, wantErr %v",
err, tt.wantErr)
}
})
diff --git
a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go
b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go
index 06aa2e2ddf8..878d73efb1c 100644
--- a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go
+++ b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper.go
@@ -32,7 +32,7 @@ import (
"beam.apache.org/playground/backend/internal/db/entity"
"beam.apache.org/playground/backend/internal/fs_tool"
"beam.apache.org/playground/backend/internal/logger"
- "beam.apache.org/playground/backend/internal/utils"
+ utils "beam.apache.org/playground/backend/internal/utils"
)
const (
@@ -41,13 +41,11 @@ const (
javaLogFilePlaceholder = "{logFilePath}"
goModFileName = "go.mod"
goSumFileName = "go.sum"
- scioProjectName = "y"
+ bashCmd = "bash"
+ scioProjectName = "scio"
scioProjectPath = scioProjectName + "/src/main/scala/" +
scioProjectName
logFileName = "logs.log"
defaultExampleInSbt = "WordCount.scala"
- shCmd = "sh"
- rmCmd = "rm"
- cpCmd = "cp"
scioProject = "new_scio_project.sh"
scioCommonConstants = "ExampleData.scala"
)
@@ -128,11 +126,11 @@ func Setup(sdk pb.Sdk, sources []entity.FileEntity,
pipelineId uuid.UUID, workin
// prepareGoFiles prepares file for Go environment.
// Copy go.mod and go.sum file from /path/to/preparedModDir to
/path/to/workingDir/pipelinesFolder/{pipelineId}
func prepareGoFiles(lc *fs_tool.LifeCycle, preparedModDir string, pipelineId
uuid.UUID) error {
- if err := lc.CopyFile(goModFileName, preparedModDir,
lc.Paths.AbsoluteBaseFolderPath); err != nil {
+ if err := utils.CopyFilePreservingName(goModFileName, preparedModDir,
lc.Paths.AbsoluteBaseFolderPath); err != nil {
logger.Errorf("%s: error during copying %s file: %s\n",
pipelineId, goModFileName, err.Error())
return err
}
- if err := lc.CopyFile(goSumFileName, preparedModDir,
lc.Paths.AbsoluteBaseFolderPath); err != nil {
+ if err := utils.CopyFilePreservingName(goSumFileName, preparedModDir,
lc.Paths.AbsoluteBaseFolderPath); err != nil {
logger.Errorf("%s: error during copying %s file: %s\n",
pipelineId, goSumFileName, err.Error())
return err
}
@@ -144,7 +142,7 @@ func prepareGoFiles(lc *fs_tool.LifeCycle, preparedModDir
string, pipelineId uui
//
// and update this file according to pipeline.
func prepareJavaFiles(lc *fs_tool.LifeCycle, workingDir string, pipelineId
uuid.UUID) error {
- err := lc.CopyFile(javaLogConfigFileName, workingDir,
lc.Paths.AbsoluteBaseFolderPath)
+ err := utils.CopyFilePreservingName(javaLogConfigFileName, workingDir,
lc.Paths.AbsoluteBaseFolderPath)
if err != nil {
logger.Errorf("%s: error during copying logging.properties
file: %s\n", pipelineId, err.Error())
return err
@@ -194,7 +192,7 @@ func updateJavaLogConfigFile(paths fs_tool.LifeCyclePaths)
error {
}
func prepareSbtFiles(lc *fs_tool.LifeCycle, pipelineFolder string, workingDir
string) (*fs_tool.LifeCycle, error) {
- cmd := exec.Command(shCmd, filepath.Join(workingDir, scioProject))
+ cmd := exec.Command(bashCmd, filepath.Join(workingDir, scioProject))
cmd.Dir = pipelineFolder
_, err := cmd.Output()
if err != nil {
@@ -210,30 +208,29 @@ func prepareSbtFiles(lc *fs_tool.LifeCycle,
pipelineFolder string, workingDir st
projectFolder, _ := filepath.Abs(filepath.Join(pipelineFolder,
scioProjectName))
executableName := lc.Paths.ExecutableName
- _, err = exec.Command(rmCmd, filepath.Join(absFileFolderPath,
defaultExampleInSbt)).Output()
+ err = os.Remove(filepath.Join(absFileFolderPath, defaultExampleInSbt))
if err != nil {
return lc, err
}
- _, err = exec.Command(cpCmd, filepath.Join(workingDir,
scioCommonConstants), absFileFolderPath).Output()
+ err = utils.CopyFilePreservingName(scioCommonConstants, workingDir,
absFileFolderPath)
if err != nil {
return lc, err
}
- lc = &fs_tool.LifeCycle{
- Paths: fs_tool.LifeCyclePaths{
- SourceFileName: fileName,
- AbsoluteSourceFileFolderPath: absFileFolderPath,
- AbsoluteSourceFilePath: absFilePath,
- ExecutableFileName: fileName,
- AbsoluteExecutableFileFolderPath: absFileFolderPath,
- AbsoluteExecutableFilePath: absFilePath,
- AbsoluteBaseFolderPath: absFileFolderPath,
- AbsoluteLogFilePath: absLogFilePath,
- AbsoluteGraphFilePath: absGraphFilePath,
- ProjectDir: projectFolder,
- },
- }
- lc.Paths.ExecutableName = executableName
+ lc.Paths = fs_tool.LifeCyclePaths{
+ SourceFileName: fileName,
+ AbsoluteSourceFileFolderPath: absFileFolderPath,
+ AbsoluteSourceFilePath: absFilePath,
+ ExecutableFileName: fileName,
+ AbsoluteExecutableFileFolderPath: absFileFolderPath,
+ AbsoluteExecutableFilePath: absFilePath,
+ AbsoluteBaseFolderPath: absFileFolderPath,
+ AbsoluteLogFilePath: absLogFilePath,
+ AbsoluteGraphFilePath: absGraphFilePath,
+ ProjectDir: projectFolder,
+ ExecutableName: executableName,
+ }
+
return lc, nil
}
diff --git
a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper_test.go
b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper_test.go
index 74776f52729..e709e87ea7c 100644
---
a/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper_test.go
+++
b/playground/backend/internal/setup_tools/life_cycle/life_cycle_setuper_test.go
@@ -285,7 +285,13 @@ func TestSetup(t *testing.T) {
pipelinesFolder: pipelinesFolder,
},
prep: func() error {
- _, err := os.Create(filepath.Join(workingDir,
scioCommonConstants))
+ sourceScioShFile :=
"../../../new_scio_project.sh"
+ scioShFile := filepath.Join(workingDir,
scioProject)
+ err := utils.CopyFile(sourceScioShFile,
scioShFile)
+ if err != nil {
+ return err
+ }
+ _, err = os.Create(filepath.Join(workingDir,
scioCommonConstants))
if err != nil {
return err
}
diff --git a/playground/backend/internal/utils/common.go
b/playground/backend/internal/utils/common.go
index 7af60cba6f6..9720ed5df9e 100644
--- a/playground/backend/internal/utils/common.go
+++ b/playground/backend/internal/utils/common.go
@@ -17,9 +17,12 @@ package utils
import (
"beam.apache.org/playground/backend/internal/logger"
- "github.com/google/uuid"
+ "fmt"
"gopkg.in/yaml.v3"
+ "io"
"io/ioutil"
+ "os"
+ "path/filepath"
"regexp"
)
@@ -28,16 +31,6 @@ func ReduceWhiteSpacesToSinge(s string) string {
return re.ReplaceAllString(s, " ")
}
-// ReadFile reads from file and returns string.
-func ReadFile(pipelineId uuid.UUID, path string) (string, error) {
- content, err := ioutil.ReadFile(path)
- if err != nil {
- logger.Errorf("%s: ReadFile(): error during reading from a
file: %s", pipelineId, err.Error())
- return "", err
- }
- return string(content), nil
-}
-
// ReadYamlFile reads from a yaml file.
func ReadYamlFile(filename string, out interface{}) error {
buf, err := ioutil.ReadFile(filename)
@@ -51,3 +44,39 @@ func ReadYamlFile(filename string, out interface{}) error {
}
return nil
}
+
+// CopyFilePreservingName copies a file with fileName from sourceDir to
destinationDir.
+func CopyFilePreservingName(fileName, sourceDir, destinationDir string) error {
+ absSourcePath := filepath.Join(sourceDir, fileName)
+ absDestinationPath := filepath.Join(destinationDir, fileName)
+ return CopyFile(absSourcePath, absDestinationPath)
+}
+
+// CopyFile copies a file from sourcePath to destinationPath
+func CopyFile(sourcePath, destinationPath string) error {
+ sourceFileStat, err := os.Stat(sourcePath)
+ if err != nil {
+ return err
+ }
+
+ if !sourceFileStat.Mode().IsRegular() {
+ return fmt.Errorf("%s is not a regular file", sourcePath)
+ }
+
+ sourceFile, err := os.Open(sourcePath)
+ if err != nil {
+ return err
+ }
+ defer sourceFile.Close()
+
+ destinationFile, err := os.Create(destinationPath)
+ if err != nil {
+ return err
+ }
+ defer destinationFile.Close()
+ _, err = io.Copy(destinationFile, sourceFile)
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/playground/backend/internal/utils/common_test.go
b/playground/backend/internal/utils/common_test.go
index b62c8d3fb5f..7303582f101 100644
--- a/playground/backend/internal/utils/common_test.go
+++ b/playground/backend/internal/utils/common_test.go
@@ -16,52 +16,9 @@
package utils
import (
- "fmt"
- "github.com/google/uuid"
- "os"
- "path/filepath"
"testing"
)
-const (
- sourceDir = "sourceDir"
- fileName = "file.txt"
- fileContent = "content"
- javaFileName = "javaFileName.java"
- pythonExampleName = "wordCount.py"
- wordCountPython = "import argparse\nimport logging\nimport
re\n\nimport apache_beam as beam\nfrom apache_beam.io import ReadFromText\nfrom
apache_beam.io import WriteToText\nfrom apache_beam.options.pipeline_options
import PipelineOptions\nfrom apache_beam.options.pipeline_options import
SetupOptions\n\n\nclass WordExtractingDoFn(beam.DoFn):\n \"\"\"Parse each line
of input text into words.\"\"\"\n def process(self, element):\n
\"\"\"Returns an iterator over the words of this ele [...]
- javaCode = "package org.apache.beam.examples;\n\n//
beam-playground:\n// name: MinimalWordCount\n// description: An example
that counts words in Shakespeare's works.\n// multifile: false\n//
default_example: true\n// context_line: 71\n// categories:\n// -
Combiners\n// - Filtering\n// - IO\n// - Core Transforms\n// -
Quickstart\n\nimport java.util.Arrays;\nimport
org.apache.beam.sdk.Pipeline;\nimport org.apache.beam.sdk.io.TextIO;\nimport
org.apa [...]
- filePermission = 0600
- fullPermission = 0755
-)
-
-func TestMain(m *testing.M) {
- err := setup()
- if err != nil {
- panic(fmt.Errorf("error during test setup: %s", err.Error()))
- }
- defer teardown()
- m.Run()
-}
-
-func setup() error {
- err := os.Mkdir(sourceDir, fullPermission)
- if err != nil {
- return err
- }
- filePath := filepath.Join(sourceDir, fileName)
- err = os.WriteFile(filePath, []byte(fileContent), filePermission)
- javaFilePath := filepath.Join(sourceDir, javaFileName)
- err = os.WriteFile(javaFilePath, []byte(javaCode), filePermission)
- wordCountPythonPath := filepath.Join(sourceDir, pythonExampleName)
- err = os.WriteFile(wordCountPythonPath, []byte(wordCountPython),
filePermission)
- return err
-}
-
-func teardown() error {
- return os.RemoveAll(sourceDir)
-}
-
func TestReduceWhiteSpacesToSinge(t *testing.T) {
type args struct {
s string
@@ -82,47 +39,3 @@ func TestReduceWhiteSpacesToSinge(t *testing.T) {
})
}
}
-
-func TestReadFile(t *testing.T) {
- type args struct {
- pipelineId uuid.UUID
- path string
- }
- tests := []struct {
- name string
- args args
- want string
- wantErr bool
- }{
- {
- name: "Read from existing file",
- args: args{
- pipelineId: uuid.New(),
- path: filepath.Join(sourceDir, fileName),
- },
- want: fileContent,
- wantErr: false,
- },
- {
- name: "Read from non-existent file",
- args: args{
- pipelineId: uuid.New(),
- path: filepath.Join(sourceDir,
"non-existent_file.txt"),
- },
- want: "",
- wantErr: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- got, err := ReadFile(tt.args.pipelineId, tt.args.path)
- if (err != nil) != tt.wantErr {
- t.Errorf("ReadFile() error = %v, wantErr %v",
err, tt.wantErr)
- return
- }
- if got != tt.want {
- t.Errorf("ReadFile() got = %v, want %v", got,
tt.want)
- }
- })
- }
-}
diff --git a/playground/backend/internal/utils/preparers_utils.go
b/playground/backend/internal/utils/preparers_utils.go
index 7f9867d6f63..5a446ceaaee 100644
--- a/playground/backend/internal/utils/preparers_utils.go
+++ b/playground/backend/internal/utils/preparers_utils.go
@@ -148,7 +148,12 @@ func GetPublicClassName(filePath, pattern string) (string,
error) {
return "", err
}
re := regexp.MustCompile(pattern)
- className := re.FindStringSubmatch(string(code))[1]
+ classNameMatch := re.FindStringSubmatch(string(code))
+ if len(classNameMatch) == 0 {
+ return "", errors.New(fmt.Sprintf("unable to find main class
name in file %s", filePath))
+ }
+
+ className := classNameMatch[1]
return className, err
}
diff --git a/playground/backend/internal/utils/preparers_utils_test.go
b/playground/backend/internal/utils/preparers_utils_test.go
index 498b2d964e5..72f0eab5cb4 100644
--- a/playground/backend/internal/utils/preparers_utils_test.go
+++ b/playground/backend/internal/utils/preparers_utils_test.go
@@ -27,6 +27,58 @@ import (
"testing"
)
+const (
+ sourceDir = "sourceDir"
+ fileName = "file.txt"
+ fileContent = "content"
+ testDataDir = "test_data"
+ javaFileName = "JavaFileName.java"
+ emptyFileName = "emptyFile.java"
+ pythonExampleName = "wordcount.py"
+ filePermission = 0600
+ fullPermission = 0755
+)
+
+func TestMain(m *testing.M) {
+ err := setup()
+ if err != nil {
+ panic(fmt.Errorf("error during test setup: %s", err.Error()))
+ }
+ defer teardown()
+ m.Run()
+}
+
+func setup() error {
+ err := os.Mkdir(sourceDir, fullPermission)
+ if err != nil {
+ return err
+ }
+ filePath := filepath.Join(sourceDir, fileName)
+ err = os.WriteFile(filePath, []byte(fileContent), filePermission)
+ if err != nil {
+ return err
+ }
+ sourceJavaFilePath := filepath.Join(testDataDir, javaFileName)
+ javaFilePath := filepath.Join(sourceDir, javaFileName)
+ err = CopyFile(sourceJavaFilePath, javaFilePath)
+ if err != nil {
+ return err
+ }
+ emptyFilePath := filepath.Join(sourceDir, emptyFileName)
+ err = os.WriteFile(emptyFilePath, []byte(""), filePermission)
+ if err != nil {
+ return err
+ }
+ sourceWordCountPythonPath := filepath.Join(testDataDir,
pythonExampleName)
+ wordCountPythonPath := filepath.Join(sourceDir, pythonExampleName)
+ err = CopyFile(sourceWordCountPythonPath, wordCountPythonPath)
+ return err
+}
+
+func teardown() error {
+ return os.RemoveAll(sourceDir)
+}
+
func TestSpacesToEqualsOption(t *testing.T) {
type args struct {
pipelineOptions string
@@ -177,6 +229,15 @@ func TestGetPublicClassName(t *testing.T) {
want: "MinimalWordCount",
wantErr: false,
},
+ {
+ name: "Get public class name from empty file",
+ args: args{
+ filePath: filepath.Join(sourceDir,
emptyFileName),
+ pattern: javaPublicClassNamePattern,
+ },
+ want: "",
+ wantErr: true,
+ },
{
name: "Get public class name from non-existent file",
args: args{
diff --git a/playground/backend/internal/utils/test_data/JavaFileName.java
b/playground/backend/internal/utils/test_data/JavaFileName.java
new file mode 100644
index 00000000000..9eb8e0e54cf
--- /dev/null
+++ b/playground/backend/internal/utils/test_data/JavaFileName.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+public class MinimalWordCount {
+ public static void main(String[] args) {
+ }
+}
diff --git a/playground/backend/internal/utils/test_data/wordcount.py
b/playground/backend/internal/utils/test_data/wordcount.py
new file mode 100644
index 00000000000..f4e32c902b0
--- /dev/null
+++ b/playground/backend/internal/utils/test_data/wordcount.py
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow."""
+
+# pytype: skip-file
+
+# beam-playground:
+# name: WordCount
+# description: An example that counts words in Shakespeare's works.
+# multifile: false
+# pipeline_options: --output output.txt
+# context_line: 44
+# categories:
+# - Combiners
+# - Options
+# - Quickstart
+# complexity: MEDIUM
+# tags:
+# - options
+# - count
+# - combine
+# - strings
+
+import argparse
+import logging
+import re
+
+import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+
+
+class WordExtractingDoFn(beam.DoFn):
+ """Parse each line of input text into words."""
+ def process(self, element):
+ """Returns an iterator over the words of this element.
+
+ The element is a line of text. If the line is blank, note that, too.
+
+ Args:
+ element: the element being processed
+
+ Returns:
+ The processed element.
+ """
+ return re.findall(r'[\w\']+', element, re.UNICODE)
+
+
+def run(argv=None, save_main_session=True):
+ """Main entry point; defines and runs the wordcount pipeline."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ dest='input',
+ default='gs://dataflow-samples/shakespeare/kinglear.txt',
+ help='Input file to process.')
+ parser.add_argument(
+ '--output',
+ dest='output',
+ required=True,
+ help='Output file to write results to.')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+ # The pipeline will be run on exiting the with block.
+ with beam.Pipeline(options=pipeline_options) as p:
+
+ # Read the text file[pattern] into a PCollection.
+ lines = p | 'Read' >> ReadFromText(known_args.input)
+
+ counts = (
+ lines
+ | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
+ | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
+ | 'GroupAndSum' >> beam.CombinePerKey(sum))
+
+ # Format the counts into a PCollection of strings.
+ def format_result(word, count):
+ return '%s: %d' % (word, count)
+
+ output = counts | 'Format' >> beam.MapTuple(format_result)
+
+ # Write the output using a "Write" transform that has side effects.
+ # pylint: disable=expression-not-assigned
+ output | 'Write' >> WriteToText(known_args.output)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
diff --git a/playground/backend/new_scio_project.sh
b/playground/backend/new_scio_project.sh
old mode 100644
new mode 100755
index a01fc78c923..80e031f0fd8
--- a/playground/backend/new_scio_project.sh
+++ b/playground/backend/new_scio_project.sh
@@ -15,4 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-yes | sbt new spotify/scio-template.g8
+{ printf scio\\nscio\\n; yes; } | sbt new spotify/scio-template.g8
+
+echo "Compile / run / fork := false" >> scio/build.sbt
diff --git a/playground/docker-compose.local.yaml
b/playground/docker-compose.local.yaml
index e202b6a4dca..92121a2f0c5 100644
--- a/playground/docker-compose.local.yaml
+++ b/playground/docker-compose.local.yaml
@@ -91,6 +91,10 @@ services:
- "8090:8090"
depends_on:
- redis
+ # Impose limits on container to better mimic real deployment environment
+ mem_limit: 2048M
+ ulimits:
+ rss: 1073741824
frontend:
image: apache/beam_playground-frontend