That line is indeed the root of the problem you're seeing - the monitor was written with an implicit assumption that there will generally be work available, so it's expected to do work every time it runs. However, for a low-throughput pipeline this isn't the case, and it will mostly spin and do useless work.
The behavior could be improved by adding a backoff when we run the monitor if no progress has been made since the last time the monitor attempted to add work. That would free up the CPU and reduce the amount of useless data structures we're creating. I've filed https://issues.apache.org/jira/browse/BEAM-690 to track this. On Wed, Sep 28, 2016 at 6:45 AM, Demin Alexey <[email protected]> wrote: > Hi. > > I have problem with my simple program. > CPU usage very much (one core always 100%) > > After small investigation I found strange behavior in > ExecutorServiceParallelExecutor.java:396 > > Monitor thread use code for self schedule without any pauses in code: > > if (!shouldShutdown()) { > // The monitor thread should always be scheduled; but we only need to be > scheduled once > executorService.submit(this); > } > > As result we have highload in fireTimers(); and big pressure on GC ( 400 > MB/s on very simple program with empty topic, mostly memory it's creation > of new HashMap) > > I use latest version Beam from git master. > > code for validation: > > ``` > import com.google.common.collect.ImmutableList; > import com.google.common.collect.ImmutableMap; > import org.apache.beam.runners.direct.DirectOptions; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.coders.StringUtf8Coder; > import org.apache.beam.sdk.io.kafka.KafkaIO; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > > public class CPUTest { > > public static void main(String[] args) { > DirectOptions options = > PipelineOptionsFactory.as(DirectOptions.class); > Pipeline pipeline = Pipeline.create(options); > > pipeline.apply( > KafkaIO.read() > .withBootstrapServers("localhost:9092") > .withTopics(ImmutableList.of("test")) > // set ConsumerGroup > .updateConsumerProperties(ImmutableMap.of("group.id", > "test")) > .withKeyCoder(StringUtf8Coder.of()) > .withValueCoder(StringUtf8Coder.of()) > .withoutMetadata() > ); > > pipeline.run(); > } > } > ``` > > Or is it my mistake in configurations? > > Thanks, > Alexey Diomin >
