[
https://issues.apache.org/jira/browse/FLINK-25711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jordan Moore updated FLINK-25711:
---------------------------------
Description:
*Details* - Trying to use a generated Avro SpecificRecord subclass with
KafkaIO.read (I was able to use KafkaIO.write fine with it).
*Problem* - OOM happens while constructing the deserializer with
SpecificRecord, but not GenericRecord. I am unable to use my generated class
because I get errors saying it cannot be cast to a GenericRecord (even though
it extends/implements it though a chain of other classes)
{code}
2022-01-19 17:17:47,163 DEBUG [main] options.PipelineOptionsFactory$Builder
(PipelineOptionsFactory.java:325) - Provided Arguments: {}
2022-01-19 17:17:47,345 DEBUG [main] sdk.Pipeline (Pipeline.java:158) -
Creating Pipeline#817686795
2022-01-19 17:17:47,382 DEBUG [main] sdk.Pipeline (Pipeline.java:544) - Adding
KafkaIO.Read [KafkaIO.TypedWithoutMetadata] to Pipeline#817686795
2022-01-19 17:17:47,383 DEBUG [main] sdk.Pipeline (Pipeline.java:544) - Adding
KafkaIO.Read to Pipeline#817686795
2022-01-19 17:17:47,445 DEBUG [main] coders.CoderRegistry
(CoderRegistry.java:635) - Coder for [B: ByteArrayCoder
java.lang.OutOfMemoryError: Java heap space
Dumping heap to /tmp/beam-dump ...
Heap dump file created [1086964638 bytes in 1.315 secs]
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at
io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap$HashEntry.newArray(BoundedConcurrentHashMap.java:247)
at
io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap$Segment.<init>(BoundedConcurrentHashMap.java:1200)
at
io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1637)
at
io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1670)
at
io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1654)
at
io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1683)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:181)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:170)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:136)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:98)
at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.lambda$of$282520f2$1(ConfluentSchemaRegistryDeserializerProvider.java:93)
at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider$$Lambda$70/1932332324.apply(Unknown
Source)
at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaRegistryClient(ConfluentSchemaRegistryDeserializerProvider.java:134)
at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:126)
at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:120)
at
org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:116)
at
org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1476)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1256)
at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:605)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1555)
at
org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1529)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:177)
at cricket.jmoore.jmx.Main.main(Main.java:98)
{code}
Small example with Kafka and Confluent Schema Registry locally
{code}
public static void main(String[] args) throws Exception {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
// Pipeline p = getWritePipeline(options);
Pipeline p = Pipeline.create(options);
final String topic = "foobar-2";
final SubjectNameStrategy subjectStrategy = new TopicNameStrategy();
final String valueSubject = subjectStrategy.subjectName(topic, false,
null); // schema not used
final ConfluentSchemaRegistryDeserializerProvider<SpecificRecord>
valueProvider =
ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081",
valueSubject, null,
// TODO: This doesn't
seem to work to get the SpecificRecord subclass in the apply function below
ImmutableMap.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true));
p
.apply(KafkaIO.<byte[], SpecificRecord>read()
.withBootstrapServers("localhost:9092")
.withTopic(topic)
.withKeyDeserializer(ByteArrayDeserializer.class) // Don't
have any keys, but this is required
.withValueDeserializer(valueProvider)
.withConsumerConfigUpdates(ImmutableMap.of(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.name().toLowerCase(Locale.ROOT),
ConsumerConfig.GROUP_ID_CONFIG, "beam-" +
UUID.randomUUID()
))
.withoutMetadata()
).apply(Values.create())
// TODO: How to get SpecificRecord subclass?
.apply(MapElements.via(new SimpleFunction<SpecificRecord, Void>() {
@Override
public Void apply(SpecificRecord input) {
log.info("{}", input);
return null;
}
}));
p.run().waitUntilFinish();
}
{code}
Avro schema that I am using, which generates a class Product.java that I would
like to use in-place of SpecificRecord above.
{code}
{"type":"record","name":"Product","namespace":"cricket.jmoore.avro","fields":[{"name":"name","type":"string"}]}
{code}
*Flink Version*: 2.35.0
Dependencies:
{code}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version> <!-- 2.35.0 -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
{code}
was:
*Details* - Trying to use a generated Avro SpecificRecord subclass with
KafkaIO.read (I was able to use KafkaIO.write fine with it).
*Problem* - OOM happens while constructing the deserializer with
SpecificRecord, but not GenericRecord. I am unable to use my generated class
because I get errors saying it cannot be cast to a GenericRecord (even though
it extends/implements it though a chain of other classes)
Small example with Kafka and Confluent Schema Registry locally
{code}
public static void main(String[] args) throws Exception {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
// Pipeline p = getWritePipeline(options);
Pipeline p = Pipeline.create(options);
final String topic = "foobar-2";
final SubjectNameStrategy subjectStrategy = new TopicNameStrategy();
final String valueSubject = subjectStrategy.subjectName(topic, false,
null); // schema not used
final ConfluentSchemaRegistryDeserializerProvider<SpecificRecord>
valueProvider =
ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081",
valueSubject, null,
// TODO: This doesn't
seem to work to get the SpecificRecord subclass in the apply function below
ImmutableMap.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true));
p
.apply(KafkaIO.<byte[], SpecificRecord>read()
.withBootstrapServers("localhost:9092")
.withTopic(topic)
.withKeyDeserializer(ByteArrayDeserializer.class) // Don't
have any keys, but this is required
.withValueDeserializer(valueProvider)
.withConsumerConfigUpdates(ImmutableMap.of(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.name().toLowerCase(Locale.ROOT),
ConsumerConfig.GROUP_ID_CONFIG, "beam-" +
UUID.randomUUID()
))
.withoutMetadata()
).apply(Values.create())
// TODO: How to get SpecificRecord subclass?
.apply(MapElements.via(new SimpleFunction<SpecificRecord, Void>() {
@Override
public Void apply(SpecificRecord input) {
log.info("{}", input);
return null;
}
}));
p.run().waitUntilFinish();
}
{code}
Avro schema that I am using, which generates a class Product.java that I would
like to use in-place of SpecificRecord above.
{code}
{"type":"record","name":"Product","namespace":"cricket.jmoore.avro","fields":[{"name":"name","type":"string"}]}
{code}
*Flink Version*: 2.35.0
Dependencies:
{code}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version> <!-- 2.35.0 -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
{code}
> OOM / buffer-overflow on KafkaIO.read using SpecificRecord
> ----------------------------------------------------------
>
> Key: FLINK-25711
> URL: https://issues.apache.org/jira/browse/FLINK-25711
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: Jordan Moore
> Priority: Major
>
> *Details* - Trying to use a generated Avro SpecificRecord subclass with
> KafkaIO.read (I was able to use KafkaIO.write fine with it).
> *Problem* - OOM happens while constructing the deserializer with
> SpecificRecord, but not GenericRecord. I am unable to use my generated class
> because I get errors saying it cannot be cast to a GenericRecord (even though
> it extends/implements it though a chain of other classes)
> {code}
> 2022-01-19 17:17:47,163 DEBUG [main] options.PipelineOptionsFactory$Builder
> (PipelineOptionsFactory.java:325) - Provided Arguments: {}
> 2022-01-19 17:17:47,345 DEBUG [main] sdk.Pipeline (Pipeline.java:158) -
> Creating Pipeline#817686795
> 2022-01-19 17:17:47,382 DEBUG [main] sdk.Pipeline (Pipeline.java:544) -
> Adding KafkaIO.Read [KafkaIO.TypedWithoutMetadata] to Pipeline#817686795
> 2022-01-19 17:17:47,383 DEBUG [main] sdk.Pipeline (Pipeline.java:544) -
> Adding KafkaIO.Read to Pipeline#817686795
> 2022-01-19 17:17:47,445 DEBUG [main] coders.CoderRegistry
> (CoderRegistry.java:635) - Coder for [B: ByteArrayCoder
> java.lang.OutOfMemoryError: Java heap space
> Dumping heap to /tmp/beam-dump ...
> Heap dump file created [1086964638 bytes in 1.315 secs]
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
> at
> io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap$HashEntry.newArray(BoundedConcurrentHashMap.java:247)
> at
> io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap$Segment.<init>(BoundedConcurrentHashMap.java:1200)
> at
> io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1637)
> at
> io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1670)
> at
> io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1654)
> at
> io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1683)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:181)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:170)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:136)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:98)
> at
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.lambda$of$282520f2$1(ConfluentSchemaRegistryDeserializerProvider.java:93)
> at
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider$$Lambda$70/1932332324.apply(Unknown
> Source)
> at
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaRegistryClient(ConfluentSchemaRegistryDeserializerProvider.java:134)
> at
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:126)
> at
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:120)
> at
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:116)
> at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1476)
> at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1256)
> at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:605)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
> at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
> at
> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1555)
> at
> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1529)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
> at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
> at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:177)
> at cricket.jmoore.jmx.Main.main(Main.java:98)
> {code}
> Small example with Kafka and Confluent Schema Registry locally
> {code}
> public static void main(String[] args) throws Exception {
> PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
> // Pipeline p = getWritePipeline(options);
> Pipeline p = Pipeline.create(options);
> final String topic = "foobar-2";
> final SubjectNameStrategy subjectStrategy = new TopicNameStrategy();
> final String valueSubject = subjectStrategy.subjectName(topic, false,
> null); // schema not used
> final ConfluentSchemaRegistryDeserializerProvider<SpecificRecord>
> valueProvider =
>
> ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081",
> valueSubject, null,
> // TODO: This doesn't
> seem to work to get the SpecificRecord subclass in the apply function below
>
> ImmutableMap.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
> true));
> p
> .apply(KafkaIO.<byte[], SpecificRecord>read()
> .withBootstrapServers("localhost:9092")
> .withTopic(topic)
> .withKeyDeserializer(ByteArrayDeserializer.class) // Don't
> have any keys, but this is required
> .withValueDeserializer(valueProvider)
> .withConsumerConfigUpdates(ImmutableMap.of(
> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> OffsetResetStrategy.EARLIEST.name().toLowerCase(Locale.ROOT),
> ConsumerConfig.GROUP_ID_CONFIG, "beam-" +
> UUID.randomUUID()
> ))
> .withoutMetadata()
> ).apply(Values.create())
> // TODO: How to get SpecificRecord subclass?
> .apply(MapElements.via(new SimpleFunction<SpecificRecord, Void>() {
> @Override
> public Void apply(SpecificRecord input) {
> log.info("{}", input);
> return null;
> }
> }));
> p.run().waitUntilFinish();
> }
> {code}
> Avro schema that I am using, which generates a class Product.java that I
> would like to use in-place of SpecificRecord above.
> {code}
> {"type":"record","name":"Product","namespace":"cricket.jmoore.avro","fields":[{"name":"name","type":"string"}]}
> {code}
> *Flink Version*: 2.35.0
> Dependencies:
> {code}
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-clients</artifactId>
> <version>2.8.1</version>
> </dependency>
> <dependency>
> <groupId>io.confluent</groupId>
> <artifactId>kafka-avro-serializer</artifactId>
> <version>7.0.1</version>
> </dependency>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-runners-direct-java</artifactId>
> <version>${beam.version}</version> <!-- 2.35.0 -->
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.beam</groupId>
> <artifactId>beam-sdks-java-core</artifactId>
> <version>${beam.version}</version>
> </dependency>
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)