daria-malkova commented on a change in pull request #16611:
URL: https://github.com/apache/beam/pull/16611#discussion_r800871562
##########
File path: playground/backend/internal/preparers/python_preparers.go
##########
@@ -112,3 +133,76 @@ func writeToFile(to *os.File, str string) error {
}
return nil
}
+
+// saveGraph adds code to pipeline to save the pipeline's graph to the file
GraphFileName
+func saveGraph(from *os.File, to *os.File) error {
+ newLine := false
+ reg := regexp.MustCompile(findPipelinePattern)
+ scanner := bufio.NewScanner(from)
+ pipelineName := ""
+ spaces := ""
+ err := errors.New("")
+
+ for scanner.Scan() {
+ line := scanner.Text()
+ if pipelineName == "" {
+ // Try to find where the beam pipeline declaration is
located
+ err = writeLineToFile(findPipelineLine)(newLine, to,
&line, &spaces, &pipelineName, ®)
+ } else if reg != nil {
+ // Try to find where beam pipeline definition is
finished and add code to store the graph
+ reg =
regexp.MustCompile(fmt.Sprintf(indentationPattern, spaces))
+ err = writeLineToFile(addCodeForGraph)(newLine, to,
&line, &spaces, &pipelineName, ®)
+ } else {
+ err = writeLineToFile(func(line, spaces, pipelineName
*string, reg **regexp.Regexp) {
+ // No need to find or change anything, just
write current line to file
+ })(newLine, to, &line, &spaces, &pipelineName, nil)
+ }
+ if err != nil {
+ logger.Errorf("Preparation: Error during write \"%s\"
to tmp file, err: %s\n", line, err.Error())
+ return err
+ }
+ newLine = true
+ }
+ return scanner.Err()
+}
+
+//findPipelineLine looking for a declaration of a beam pipeline and it's name
+func findPipelineLine(line, spaces, pipelineName *string, reg **regexp.Regexp)
{
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]