Hi Flink Team,
I am currently reading streaming data from RabbitMQ and using the
RMQConnectionConfig for establishing the connection. Here's how I'm setting
up the connection:
and we use flink version 1.16.2 and RabbitMQ version 3.10.7
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setPrefetchCount(smsInput.prefetchCount)
.setHost(smsInput.HostServer)
.setPort(smsInput.HostPort)
.setUserName(smsInput.HostUserName)
.setPassword(smsInput.HostPassword)
.setVirtualHost("/")
.build();
ConnectionFactory rabbitMQConnectionFactory =
connectionConfig.getConnectionFactory();
rabbitMQConnectionFactory.setRequestedChannelMax(smsInput.prefetchCount);
// Set prefetchcount
DataStream<String> stream = executionEnvironment.addSource(new
RMQSource<String>(connectionConfig,
smsInput.QueueName,
new SimpleStringSchema()))
.setParallelism(1);
Additionally, I have configured the prefetch count to read 3 data at the
same time from RabbitMQ. Here's how I have enabled the checkpointing
interval.
executionEnvironment.enableCheckpointing(smsInput.checkpointIntervalMS,CheckpointingMode.EXACTLY_ONCE,true);
The prefetch count seems to be working fine, but when I run the job with a
parallelism of 3, the prefetchCount is not working as expected.
We establish a connection to RabbitMQ with a fixed setParallelism of 1.
However, my other operators retrieve data from RabbitMQ and execute the job
with a parallelism of 3, as shown in the following command.
bin/flink run -p 3 ../apps/Flink_1.16.2_prefetch.jar
../config/app-config.properties -yD
env.java.home=/usr/lib/jvm/java-11-openjdk-11.0.19.0.7-1.el7_9.x86_64
So kindly provide a solution for configuring the prefetch count with
parallelism.
Thanks,
Ajay Pandey