This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 9bb83c0  [FLINK-27009] Add example for running SQL scripts using the 
operator
9bb83c0 is described below

commit 9bb83c0c9180b77c1ce6b3d96c0c3bb1f93d51de
Author: Gyula Fora <[email protected]>
AuthorDate: Wed Jul 6 14:15:47 2022 +0200

    [FLINK-27009] Add example for running SQL scripts using the operator
---
 examples/flink-sql-runner-example/Dockerfile       |  23 ++++
 examples/flink-sql-runner-example/README.md        |  80 ++++++++++++
 examples/flink-sql-runner-example/pom.xml          | 134 +++++++++++++++++++++
 examples/flink-sql-runner-example/sql-example.yaml |  41 +++++++
 .../sql-scripts/simple.sql                         |  27 +++++
 .../sql-scripts/statement-set.sql                  |  38 ++++++
 .../java/org/apache/flink/examples/SqlRunner.java  |  96 +++++++++++++++
 .../src/main/resources/log4j2.properties           |  25 ++++
 pom.xml                                            |   1 +
 9 files changed, 465 insertions(+)

diff --git a/examples/flink-sql-runner-example/Dockerfile 
b/examples/flink-sql-runner-example/Dockerfile
new file mode 100644
index 0000000..67bb563
--- /dev/null
+++ b/examples/flink-sql-runner-example/Dockerfile
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM flink:1.15.0
+
+RUN mkdir /opt/flink/usrlib
+ADD target/flink-sql-runner-example-*.jar /opt/flink/usrlib/sql-runner.jar
+ADD sql-scripts /opt/flink/usrlib/sql-scripts
diff --git a/examples/flink-sql-runner-example/README.md 
b/examples/flink-sql-runner-example/README.md
new file mode 100644
index 0000000..942cbb4
--- /dev/null
+++ b/examples/flink-sql-runner-example/README.md
@@ -0,0 +1,80 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink Kubernetes Operator SQL Example
+
+## Overview
+
+This is an end-to-end example of running Flink SQL scripts using the Flink 
Kubernetes Operator.
+
+It is only intended to serve as a showcase of how Flink SQL can be executed on 
the operator and users are expected to extend the implementation and 
dependencies based on their production needs. 
+
+Currently, it is not planned to add direct API support for SQL submission to 
the Kubernetes operator due to the complexity of image and dependency 
management that is specific to each use-case.
+At the same time we are confident that using these examples as a starting 
point the operator would cover all user needs. If Apache Flink itself extends 
the SQL support for Application mode in the future, the operator will aim to 
support that.
+
+*What's in this example?*
+
+ 1. SQL Script runner Flink Java application
+ 2. DockerFile to build custom image with script runner + SQL scripts
+ 3. Example YAML for submitting scripts using the operator
+
+## How does it work?
+
+As Flink doesn't support submitting SQL scripts directly as jobs, we have 
created a simple Flink Java application that takes the user script and executes 
it using the `TableEnvironment#executeSql` method.
+
+The SQL Runner will allow us to execute SQL scripts as if they were simple 
Flink Application jars, something that already works quite well with the 
operator. We package the included SQL Runner implementation together with the 
SQL scripts under `sql-scripts` into a docker image and we use it in our 
`FlinkDeployment` yaml file.
+
+***Note:*** *While the included SqlRunner should work for most simple cases, 
it is not expected to be very robust or battle tested. If you find any bugs or 
limitations, feel free to open Jira tickets and bugfix PRs.*
+
+## Usage
+
+The following steps assume that you have the Flink Kubernetes Operator 
installed and running in your environment.
+
+**Step 1**: Build Sql Runner maven project
+```bash
+cd examples/flink-sql-runner-example
+mvn clean package
+```
+
+**Step 2**: Add your SQL script files under the `sql-scripts` directory
+
+**Step 3**: Build docker image
+```bash
+# Uncomment when building for local minikube env:
+# eval $(minikube docker-env)
+
+DOCKER_BUILDKIT=1 docker build . -t flink-sql-runner-example:latest
+```
+This step will create an image based on an official Flink base image including 
the SQL runner jar and your user scripts.
+
+**Step 4**: Create FlinkDeployment Yaml and Submit
+
+Edit the included `sql-example.yaml` so that the `job.args` section points to 
the SQL script that you wish to execute, then submit it.
+
+```bash
+kubectl apply -f sql-example.yaml
+```
+
+## Connectors and other extensions
+
+This example will only work with the very basic table and connector types out 
of the box, however enabling new ones is very easy.
+
+Simply find your [required 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/)
 and add it as compile dependency to the `flink-sql-runner-example` project 
`pom.xml`. This will ensure that is packaged into the sql-runner fatjar and 
will be available for you on the cluster.
+
+Once you dive deeper you will quickly find that the SqlRunner implementation 
is very basic and might not cover your more advanced needs. Feel free to simply 
extend or customise the code as necessary for your requirements.
diff --git a/examples/flink-sql-runner-example/pom.xml 
b/examples/flink-sql-runner-example/pom.xml
new file mode 100644
index 0000000..4c53652
--- /dev/null
+++ b/examples/flink-sql-runner-example/pom.xml
@@ -0,0 +1,134 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-kubernetes-operator-parent</artifactId>
+                       <version>1.1-SNAPSHOT</version>
+                       <relativePath>../..</relativePath>
+       </parent>
+
+       <artifactId>flink-sql-runner-example</artifactId>
+       <name>Flink SQL Runner Example</name>
+
+       <dependencies>
+               <!-- Apache Flink dependencies -->
+               <!-- These dependencies are provided, because they should not 
be packaged into the JAR file. -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-api-java</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Add connector dependencies here. They must be in the 
default scope (compile). -->
+
+               <!-- Example:
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-kafka</artifactId>
+                       <version>${flink.version}</version>
+               </dependency>
+               -->
+
+               <!-- Add logging framework, to produce console output when 
running in the IDE. -->
+               <!-- These dependencies are excluded from the application JAR 
by default. -->
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-api</artifactId>
+                       <version>${slf4j.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-slf4j-impl</artifactId>
+                       <version>${log4j.version}</version>
+                       <scope>runtime</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-api</artifactId>
+                       <version>${log4j.version}</version>
+                       <scope>runtime</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-core</artifactId>
+                       <version>${log4j.version}</version>
+                       <scope>runtime</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <version>3.1.1</version>
+                               <executions>
+                                       <!-- Run shade goal on package phase -->
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <artifactSet>
+                                                               <excludes>
+                                                                       
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
+                                                                       
<exclude>com.google.code.findbugs:jsr305</exclude>
+                                                                       
<exclude>org.slf4j:*</exclude>
+                                                                       
<exclude>org.apache.logging.log4j:*</exclude>
+                                                               </excludes>
+                                                       </artifactSet>
+                                                       <filters>
+                                                               <filter>
+                                                                       <!-- Do 
not copy the signatures in the META-INF folder.
+                                                                       
Otherwise, this might cause SecurityExceptions when using the JAR. -->
+                                                                       
<artifact>*:*</artifact>
+                                                                       
<excludes>
+                                                                               
<exclude>META-INF/*.SF</exclude>
+                                                                               
<exclude>META-INF/*.DSA</exclude>
+                                                                               
<exclude>META-INF/*.RSA</exclude>
+                                                                       
</excludes>
+                                                               </filter>
+                                                       </filters>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.examples.SqlRunner</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>
diff --git a/examples/flink-sql-runner-example/sql-example.yaml 
b/examples/flink-sql-runner-example/sql-example.yaml
new file mode 100644
index 0000000..14865e0
--- /dev/null
+++ b/examples/flink-sql-runner-example/sql-example.yaml
@@ -0,0 +1,41 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+  name: sql-example
+spec:
+  image: flink-sql-runner-example:latest
+  flinkVersion: v1_15
+  flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "1"
+  serviceAccount: flink
+  jobManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  job:
+    jarURI: local:///opt/flink/usrlib/sql-runner.jar
+    args: ["/opt/flink/usrlib/sql-scripts/simple.sql"]
+    parallelism: 1
+    upgradeMode: stateless
diff --git a/examples/flink-sql-runner-example/sql-scripts/simple.sql 
b/examples/flink-sql-runner-example/sql-scripts/simple.sql
new file mode 100644
index 0000000..8b8634e
--- /dev/null
+++ b/examples/flink-sql-runner-example/sql-scripts/simple.sql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE blackhole_table (
+  name STRING,
+  age INT
+) WITH (
+  'connector' = 'blackhole'
+);
+
+INSERT INTO blackhole_table
+  VALUES ('fred flintstone', 35), ('barney rubble', 32);
diff --git a/examples/flink-sql-runner-example/sql-scripts/statement-set.sql 
b/examples/flink-sql-runner-example/sql-scripts/statement-set.sql
new file mode 100644
index 0000000..ec45fd2
--- /dev/null
+++ b/examples/flink-sql-runner-example/sql-scripts/statement-set.sql
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE blackhole_table (
+  name STRING,
+  age INT
+) WITH (
+  'connector' = 'blackhole'
+);
+CREATE TABLE blackhole_table2 (
+  name STRING,
+  age INT
+) WITH (
+  'connector' = 'blackhole'
+);
+
+EXECUTE STATEMENT SET
+BEGIN
+INSERT INTO blackhole_table
+  VALUES ('fred flintstone', 35), ('barney rubble', 32);
+INSERT INTO blackhole_table2
+  VALUES ('fred flintstone', 35), ('barney rubble', 32);
+END;
diff --git 
a/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java
 
b/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java
new file mode 100644
index 0000000..0717990
--- /dev/null
+++ 
b/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Main class for executing SQL scripts. */
+public class SqlRunner {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SqlRunner.class);
+
+    private static final String STATEMENT_DELIMITER = ";"; // a statement 
should end with `;`
+    private static final String LINE_DELIMITER = "\n";
+
+    private static final String COMMENT_PATTERN = 
"(--.*)|(((\\/\\*)+?[\\w\\W]+?(\\*\\/)+))";
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 1) {
+            throw new Exception("Exactly one argument is expected.");
+        }
+        var script = FileUtils.readFileUtf8(new File(args[0]));
+        var statements = parseStatements(script);
+
+        var tableEnv = TableEnvironment.create(new Configuration());
+
+        for (String statement : statements) {
+            LOG.info("Executing:\n{}", statement);
+            tableEnv.executeSql(statement);
+        }
+    }
+
+    public static List<String> parseStatements(String script) {
+        var formatted = formatSqlFile(script).replaceAll(COMMENT_PATTERN, "");
+        var statements = new ArrayList<String>();
+
+        StringBuilder current = null;
+        boolean statementSet = false;
+        for (String line : formatted.split("\n")) {
+            var trimmed = line.trim();
+            if (trimmed.isBlank()) {
+                continue;
+            }
+            if (current == null) {
+                current = new StringBuilder();
+            }
+            if (trimmed.startsWith("EXECUTE STATEMENT SET")) {
+                statementSet = true;
+            }
+            current.append(trimmed);
+            current.append("\n");
+            if (trimmed.endsWith(STATEMENT_DELIMITER)) {
+                if (!statementSet || trimmed.equals("END;")) {
+                    statements.add(current.toString());
+                    current = null;
+                    statementSet = false;
+                }
+            }
+        }
+        return statements;
+    }
+
+    public static String formatSqlFile(String content) {
+        String trimmed = content.trim();
+        StringBuilder formatted = new StringBuilder();
+        formatted.append(trimmed);
+        if (!trimmed.endsWith(STATEMENT_DELIMITER)) {
+            formatted.append(STATEMENT_DELIMITER);
+        }
+        formatted.append(LINE_DELIMITER);
+        return formatted.toString();
+    }
+}
diff --git 
a/examples/flink-sql-runner-example/src/main/resources/log4j2.properties 
b/examples/flink-sql-runner-example/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..cea22d8
--- /dev/null
+++ b/examples/flink-sql-runner-example/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+rootLogger.level=INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/pom.xml b/pom.xml
index 981815f..68216c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@ under the License.
         <module>flink-kubernetes-operator</module>
         <module>flink-kubernetes-webhook</module>
         <module>flink-kubernetes-docs</module>
+        <module>examples/flink-sql-runner-example</module>
     </modules>
 
     <properties>

Reply via email to