[
https://issues.apache.org/jira/browse/FLINK-9166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439361#comment-16439361
]
Fabian Hueske commented on FLINK-9166:
--------------------------------------
Thanks for opening this issue [[email protected]].
I think this has nothing to do with SQL but just with the amount of jobs
running on a single JM.
The JM is implemented as an Actor and handles certain communication tasks
synchronously in its main thread. Having that many jobs running on a JM can
cause the JM to become unresponsive.
I'd suggest to distribute the queries to more Flink JMs / clusters. If you run
this on a resource manager such as YARN, it is fairly easy to start a new
cluster.
Best, Fabian
> Performance issue with Flink SQL
> --------------------------------
>
> Key: FLINK-9166
> URL: https://issues.apache.org/jira/browse/FLINK-9166
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.4.2
> Reporter: SUBRAMANYA SURESH
> Priority: Major
> Labels: flink, graph, performance, sql, yarn
>
> With a high number of Flink SQL queries (100 of below), the Flink command
> line client fails with a "JobManager did not respond within 600000 ms" on a
> Yarn cluster.
> * JobManager logs has nothing after the last TaskManager started except
> DEBUG logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in
> JobManager", indicating its likely stuck (creating the ExecutionGraph?).
> * The same works as standalone java program locally (high CPU initially)
> * Note: Each Row in structStream contains 515 columns (many end up null)
> including a column that has the raw message.
> * In the YARN cluster we specify 18GB for TaskManager, 18GB for the
> JobManager, 145 TaskManagers with 5 slots each and parallelism of 725
> (partitions in our Kafka source).
> *Query:*
> {code:java}
> select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp,
> EventTimestamp, RawMsg, Source
> from structStream
> where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType'
> and Outcome='Success'
> group by tumble(proctime, INTERVAL '1' SECOND), Environment,
> CollectedTimestamp, EventTimestamp, RawMsg, Source
> {code}
> *Code:*
> {code:java}
> public static void main(String[] args) throws Exception {
>
> FileSystems.newFileSystem(KafkaReadingStreamingJob.class.getResource(WHITELIST_CSV).toURI(),
> new HashMap<>());
> final StreamExecutionEnvironment streamingEnvironment =
> getStreamExecutionEnvironment();
> final StreamTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(streamingEnvironment);
> final DataStream<Row> structStream =
> getKafkaStreamOfRows(streamingEnvironment);
> tableEnv.registerDataStream("structStream", structStream);
> tableEnv.scan("structStream").printSchema();
> for (int i = 0; i < 100; i++){
> for (String query : Queries.sample){
> // Queries.sample has one query that is above.
> Table selectQuery = tableEnv.sqlQuery(query);
> DataStream<Row> selectQueryStream = tableEnv.toAppendStream(selectQuery,
> Row.class);
> selectQueryStream.print();
> }
> }
> // execute program
> streamingEnvironment.execute("Kafka Streaming SQL");
> }
> private static DataStream<Row>
> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception
> {
> Properties properties = getKafkaProperties();
> // TestDeserializer deserializes the JSON to a ROW of string columns (515)
> // and also adds a column for the raw message.
> FlinkKafkaConsumer011 consumer = new
> FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new
> TestDeserializer(getRowTypeInfo()), properties);
> DataStream<Row> stream = environment.addSource(consumer);
> return stream;
> }
> private static RowTypeInfo getRowTypeInfo() throws Exception {
> // This has 515 fields.
> List<String> fieldNames = DDIManager.getDDIFieldNames();
> fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
> fieldNames.add("proctime");
> // Fill typeInformationArray with StringType to all but the last field which
> is of type Time
> .....
> return new RowTypeInfo(typeInformationArray, fieldNamesArray);
> }
> private static StreamExecutionEnvironment getStreamExecutionEnvironment()
> throws IOException {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
> env.enableCheckpointing(60000);
> env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
> env.setParallelism(725);
> return env;
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)