quanzhian commented on issue #1753:
URL: 
https://github.com/apache/incubator-seatunnel/issues/1753#issuecomment-1113105967

   I have an idea @legendtkl @ruanwenjun @BenJFan 
   
   Customize a set statement prefixed with "SET flink_env.execution.parallelism 
= 1; "
   
   ```
   --
   -- Licensed to the Apache Software Foundation (ASF) under one or more
   -- contributor license agreements.  See the NOTICE file distributed with
   -- this work for additional information regarding copyright ownership.
   -- The ASF licenses this file to You under the Apache License, Version 2.0
   -- (the "License"); you may not use this file except in compliance with
   -- the License.  You may obtain a copy of the License at
   --
   --     http://www.apache.org/licenses/LICENSE-2.0
   --
   -- Unless required by applicable law or agreed to in writing, software
   -- distributed under the License is distributed on an "AS IS" BASIS,
   -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   -- See the License for the specific language governing permissions and
   -- limitations under the License.
   --
   
   --
   -- This config file is a demonstration of sql processing in SeaTunnel config
   --
   --
   
   SET flink_env.execution.parallelism = 1;
   SET flink_env.execution.checkpoint.interval = 10000;
   SET flink_env.execution.checkpoint.data-uri = 
"hdfs://localhost:9000/checkpoint";
   
   SET 'table.dml-sync' = 'true';
   
   CREATE TABLE events (
     f_type INT,
     f_uid INT,
     ts AS localtimestamp,
     WATERMARK FOR ts AS ts
   ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='5',
     'fields.f_type.min'='1',
     'fields.f_type.max'='5',
     'fields.f_uid.min'='1',
     'fields.f_uid.max'='1000'
   );
   
   CREATE TABLE print_table (
     type INT,
     uid INT,
     lstmt TIMESTAMP
   ) WITH (
     'connector' = 'print',
     'sink.parallelism' = '1'
   );
   
   INSERT INTO print_table SELECT * FROM events where f_type = 1;
   
   ```
   
   
   
   This is my implementation and testing process
   
   ```
   [xxx@bigdata-app03 apache-seatunnel-incubating-2.1.1-SNAPSHOT]# 
./bin/start-seatunnel-sql.sh -m yarn-cluster -ys 1 -yjm 2G -ytm 3G -ynm 
seatunnel_flink_job --config /mnt/services/seatunnel/flink_sql_01.conf
   
   Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
   SLF4J: Class path contains multiple SLF4J bindings.
   SLF4J: Found binding in 
[jar:file:/mnt/services/flink-1.12.1/lib/log4j-slf4j-impl-2.17.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/usr/hdp/3.1.4.0-315/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
explanation.
   SLF4J: Actual binding is of type 
[org.apache.logging.slf4j.Log4jLoggerFactory]
   
    ############# print args ############# 
   --config
   /mnt/services/seatunnel/flink_sql_01.conf
   
   
    ############# print env ############# 
   execution.parallelism = 1
   execution.checkpoint.interval = 10000
   execution.planner = blink
   
   
    ############# print sql ############# 
   SET 'table.local-time-zone' = 'Asia/Shanghai'
   CREATE TABLE events (
     f_type INT,
     f_uid INT,
     ts AS localtimestamp,
     WATERMARK FOR ts AS ts
   ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='5',
     'fields.f_type.min'='1',
     'fields.f_type.max'='5',
     'fields.f_uid.min'='1',
     'fields.f_uid.max'='1000'
   )
   CREATE TABLE print_table (
     type INT,
     uid INT,
     lstmt TIMESTAMP
   ) WITH (
     'connector' = 'print'
   )
   INSERT INTO print_table SELECT * FROM events where f_type = 1
   
   #############setConfiguration#############
     key='table.local-time-zone' value= 'Asia/Shanghai'
   2022-04-29 15:00:50,664 INFO  org.apache.hadoop.yarn.client.AHSProxy         
              [] - Connecting to Application History server at 
bigdata-master01/172.18.247.15:10200
   2022-04-29 15:00:50,673 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - No path for the flink jar passed. Using the location of 
class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
   2022-04-29 15:00:50,794 INFO  org.apache.hadoop.conf.Configuration           
              [] - found resource resource-types.xml at 
file:/etc/hadoop/3.1.4.0-315/0/resource-types.xml
   2022-04-29 15:00:50,853 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Cluster specification: 
ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=3072, 
slotsPerTaskManager=1}
   2022-04-29 15:00:51,345 WARN  
org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The 
short-circuit local reads feature cannot be used because libhadoop cannot be 
loaded.
   2022-04-29 15:00:56,002 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Submitting application master application_1643094720025_43463
   2022-04-29 15:00:56,038 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted 
application application_1643094720025_43463
   2022-04-29 15:00:56,039 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Waiting for the cluster to be allocated
   2022-04-29 15:00:56,041 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Deploying cluster, current state ACCEPTED
   2022-04-29 15:01:06,899 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - YARN application has been deployed successfully.
   2022-04-29 15:01:06,900 INFO  org.apache.flink.yarn.YarnClusterDescriptor    
              [] - Found Web Interface bigdata-datanode01:39023 of application 
'application_1643094720025_43463'.
   
    ############# print flink applicationId ############# 
   application_1643094720025_43463
   
   Job has been submitted with JobID ccd2030e27d1fae4238f491d431a4c36
   
    ############# print jobId ############# 
   job-submitted-success:ccd2030e27d1fae4238f491d431a4c36
   
   ```
   
   
   
   This is my core code
   
   ```
   /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements. See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
   
   package org.apache.seatunnel.core.sql.job;
   
   import org.apache.seatunnel.common.utils.VariablesSubstitute;
   import org.apache.seatunnel.core.sql.splitter.SqlStatementSplitter;
   
   import java.util.ArrayList;
   import java.util.List;
   import java.util.Map;
   import java.util.regex.Matcher;
   import java.util.regex.Pattern;
   import java.util.stream.Collectors;
   
   public class JobInfo {
   
       private static final String DELIMITER = "=";
   
       private String jobContent;
   
       private List<String> flinkEnvList = new ArrayList<>();
   
       private List<String> flinkSqlList = new ArrayList<>();
   
       public JobInfo(String jobContent) {
           this.jobContent = jobContent;
       }
   
       public String getJobContent() {
           return jobContent;
       }
   
       public List<String> getFlinkEnvList() {
           return flinkEnvList;
       }
   
       public List<String> getFlinkSqlList() {
           return flinkSqlList;
       }
   
       public void substitute(List<String> variables) {
           Map<String, String> substituteMap = variables.stream()
                   .filter(v -> v.contains(DELIMITER))
                   .collect(Collectors.toMap(v -> v.split(DELIMITER)[0], v -> 
v.split(DELIMITER)[1]));
           jobContent = VariablesSubstitute.substitute(jobContent, 
substituteMap);
           this.analysisJobContent();
       }
   
       private void analysisJobContent() {
           List<String> stmts = 
SqlStatementSplitter.normalizeStatements(this.jobContent);
           for (String stmt : stmts) {
               String patternStr = FlinkSqlConstant.PATTERN_FLINK_ENV_REGEX;
               Pattern pattern = Pattern.compile(patternStr, 
FlinkSqlConstant.DEFAULT_PATTERN_FLAGS);
               Matcher matcher = pattern.matcher(stmt);
               if (matcher.find() && 
stmt.trim().toUpperCase().startsWith(FlinkSqlConstant.FLINK_SQL_SET_PREFIX)) {
                   String replaceStr = matcher.replaceAll("");
                   flinkEnvList.add(replaceStr);
               } else {
                   flinkSqlList.add(stmt);
               }
           }
           LogPrint.envPrint(flinkEnvList);
           LogPrint.sqlPrint(flinkSqlList);
       }
   
   }
   
   ```
   
   ```
   /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements. See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License. You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
   
   package org.apache.seatunnel.core.sql.job;
   
   import java.util.regex.Pattern;
   
   public final class FlinkSqlConstant {
   
       public static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE 
| Pattern.DOTALL;
   
       public static final int FLINK_SQL_SET_OPERANDS = 3;
   
       public static final String QUERY_JOBID_KEY_WORD = 
"job-submitted-success:";
   
       public static final String SYSTEM_LINE_SEPARATOR = 
System.getProperty("line.separator");
   
       public static final String PATTERN_FLINK_ENV_REGEX = 
"SET\\s+flink_env\\.";
   
       public static final String FLINK_SQL_SET_PREFIX = "SET";
   
       private FlinkSqlConstant() { }
   }
   
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to