Hi.

I have problem with my simple program.
CPU usage very much (one core always 100%)

After small investigation I found strange behavior in
ExecutorServiceParallelExecutor.java:396

Monitor thread use code for self schedule without any pauses in code:

if (!shouldShutdown()) {
  // The monitor thread should always be scheduled; but we only need to be
scheduled once
  executorService.submit(this);
}

As result we have highload in fireTimers(); and big pressure on GC ( 400
MB/s on very simple program with empty topic, mostly memory it's creation
of new HashMap)

I use latest version Beam from git master.

code for validation:

```
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class CPUTest {

    public static void main(String[] args) {
        DirectOptions options =
PipelineOptionsFactory.as(DirectOptions.class);
        Pipeline pipeline = Pipeline.create(options);

        pipeline.apply(
            KafkaIO.read()
                    .withBootstrapServers("localhost:9092")
                    .withTopics(ImmutableList.of("test"))
                    // set ConsumerGroup
                    .updateConsumerProperties(ImmutableMap.of("group.id",
"test"))
                    .withKeyCoder(StringUtf8Coder.of())
                    .withValueCoder(StringUtf8Coder.of())
                    .withoutMetadata()
        );

        pipeline.run();
    }
}
```

Or is it my mistake in configurations?

Thanks,
Alexey Diomin

Reply via email to