quanzhian commented on issue #1753: URL: https://github.com/apache/incubator-seatunnel/issues/1753#issuecomment-1113105967
I have an idea @legendtkl @ruanwenjun @BenJFan Customize a set statement prefixed with "SET flink_env.execution.parallelism = 1; " ``` -- -- Licensed to the Apache Software Foundation (ASF) under one or more -- contributor license agreements. See the NOTICE file distributed with -- this work for additional information regarding copyright ownership. -- The ASF licenses this file to You under the Apache License, Version 2.0 -- (the "License"); you may not use this file except in compliance with -- the License. You may obtain a copy of the License at -- -- http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. -- -- -- This config file is a demonstration of sql processing in SeaTunnel config -- -- SET flink_env.execution.parallelism = 1; SET flink_env.execution.checkpoint.interval = 10000; SET flink_env.execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"; SET 'table.dml-sync' = 'true'; CREATE TABLE events ( f_type INT, f_uid INT, ts AS localtimestamp, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', 'fields.f_type.min'='1', 'fields.f_type.max'='5', 'fields.f_uid.min'='1', 'fields.f_uid.max'='1000' ); CREATE TABLE print_table ( type INT, uid INT, lstmt TIMESTAMP ) WITH ( 'connector' = 'print', 'sink.parallelism' = '1' ); INSERT INTO print_table SELECT * FROM events where f_type = 1; ``` This is my implementation and testing process ``` [xxx@bigdata-app03 apache-seatunnel-incubating-2.1.1-SNAPSHOT]# ./bin/start-seatunnel-sql.sh -m yarn-cluster -ys 1 -yjm 2G -ytm 3G -ynm seatunnel_flink_job --config /mnt/services/seatunnel/flink_sql_01.conf Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/services/flink-1.12.1/lib/log4j-slf4j-impl-2.17.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/hdp/3.1.4.0-315/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] ############# print args ############# --config /mnt/services/seatunnel/flink_sql_01.conf ############# print env ############# execution.parallelism = 1 execution.checkpoint.interval = 10000 execution.planner = blink ############# print sql ############# SET 'table.local-time-zone' = 'Asia/Shanghai' CREATE TABLE events ( f_type INT, f_uid INT, ts AS localtimestamp, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', 'fields.f_type.min'='1', 'fields.f_type.max'='5', 'fields.f_uid.min'='1', 'fields.f_uid.max'='1000' ) CREATE TABLE print_table ( type INT, uid INT, lstmt TIMESTAMP ) WITH ( 'connector' = 'print' ) INSERT INTO print_table SELECT * FROM events where f_type = 1 #############setConfiguration############# key='table.local-time-zone' value= 'Asia/Shanghai' 2022-04-29 15:00:50,664 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at bigdata-master01/172.18.247.15:10200 2022-04-29 15:00:50,673 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2022-04-29 15:00:50,794 INFO org.apache.hadoop.conf.Configuration [] - found resource resource-types.xml at file:/etc/hadoop/3.1.4.0-315/0/resource-types.xml 2022-04-29 15:00:50,853 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=3072, slotsPerTaskManager=1} 2022-04-29 15:00:51,345 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2022-04-29 15:00:56,002 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1643094720025_43463 2022-04-29 15:00:56,038 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1643094720025_43463 2022-04-29 15:00:56,039 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated 2022-04-29 15:00:56,041 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED 2022-04-29 15:01:06,899 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully. 2022-04-29 15:01:06,900 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface bigdata-datanode01:39023 of application 'application_1643094720025_43463'. ############# print flink applicationId ############# application_1643094720025_43463 Job has been submitted with JobID ccd2030e27d1fae4238f491d431a4c36 ############# print jobId ############# job-submitted-success:ccd2030e27d1fae4238f491d431a4c36 ``` This is my core code ``` /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.seatunnel.core.sql.job; import org.apache.seatunnel.common.utils.VariablesSubstitute; import org.apache.seatunnel.core.sql.splitter.SqlStatementSplitter; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; public class JobInfo { private static final String DELIMITER = "="; private String jobContent; private List<String> flinkEnvList = new ArrayList<>(); private List<String> flinkSqlList = new ArrayList<>(); public JobInfo(String jobContent) { this.jobContent = jobContent; } public String getJobContent() { return jobContent; } public List<String> getFlinkEnvList() { return flinkEnvList; } public List<String> getFlinkSqlList() { return flinkSqlList; } public void substitute(List<String> variables) { Map<String, String> substituteMap = variables.stream() .filter(v -> v.contains(DELIMITER)) .collect(Collectors.toMap(v -> v.split(DELIMITER)[0], v -> v.split(DELIMITER)[1])); jobContent = VariablesSubstitute.substitute(jobContent, substituteMap); this.analysisJobContent(); } private void analysisJobContent() { List<String> stmts = SqlStatementSplitter.normalizeStatements(this.jobContent); for (String stmt : stmts) { String patternStr = FlinkSqlConstant.PATTERN_FLINK_ENV_REGEX; Pattern pattern = Pattern.compile(patternStr, FlinkSqlConstant.DEFAULT_PATTERN_FLAGS); Matcher matcher = pattern.matcher(stmt); if (matcher.find() && stmt.trim().toUpperCase().startsWith(FlinkSqlConstant.FLINK_SQL_SET_PREFIX)) { String replaceStr = matcher.replaceAll(""); flinkEnvList.add(replaceStr); } else { flinkSqlList.add(stmt); } } LogPrint.envPrint(flinkEnvList); LogPrint.sqlPrint(flinkSqlList); } } ``` ``` /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.seatunnel.core.sql.job; import java.util.regex.Pattern; public final class FlinkSqlConstant { public static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL; public static final int FLINK_SQL_SET_OPERANDS = 3; public static final String QUERY_JOBID_KEY_WORD = "job-submitted-success:"; public static final String SYSTEM_LINE_SEPARATOR = System.getProperty("line.separator"); public static final String PATTERN_FLINK_ENV_REGEX = "SET\\s+flink_env\\."; public static final String FLINK_SQL_SET_PREFIX = "SET"; private FlinkSqlConstant() { } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
