[
https://issues.apache.org/jira/browse/BEAM-7336?focusedWorklogId=393187&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393187
]
ASF GitHub Bot logged work on BEAM-7336:
----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Feb/20 05:24
Start Date: 26/Feb/20 05:24
Worklog Time Spent: 10m
Work Description: iemejia commented on pull request #10966:
[WIP][BEAM-7336] Add schema inferring for KafkaIO when reading Avro values
URL: https://github.com/apache/beam/pull/10966#discussion_r384277752
##########
File path:
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
##########
@@ -492,6 +498,45 @@ public void
testReadAvroSpecificRecordsWithConfluentSchemaRegistry() {
p.run();
}
+ @Test
+ public void testReadValuesAsAvroGenericRecordsWithSqlQuery() {
+ int numElements = 100;
+ String topic = "my_topic";
+ String schemaRegistryUrl = "mock://my-scope-name";
+ String valueSchemaSubject = topic + "-value";
+
+ Schema schema = AvroUtils.getSchema(GenericRecord.class, AVRO_SCHEMA);
+ List<Row> outputs = new ArrayList<>();
+ for (int i = 0; i < numElements; i++) {
+ outputs.add(Row.withSchema(schema).addValues("ValueName" + i, i, "color"
+ i).build());
+ }
+
+ PTransform<PBegin, PCollection<GenericRecord>> reader =
+ KafkaIO.<Integer, GenericRecord>read()
+ .withBootstrapServers("localhost:9092")
+ .withTopic(topic)
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(mockDeserializerProvider(schemaRegistryUrl,
valueSchemaSubject))
+ .withConsumerFactoryFn(
+ new ConsumerFactoryFn(
+ ImmutableList.of(topic),
+ 1,
+ numElements,
+ OffsetResetStrategy.EARLIEST,
+ i -> ByteBuffer.wrap(new byte[4]).putInt(i).array(),
+ new ValueAvroSerializableFunction(topic,
schemaRegistryUrl)))
+ .withMaxNumRecords(numElements)
+ .withAvroSchemaValues();
Review comment:
You can replace this `.withoutMetadata()` chained with `Keys.create()`
and/or `Values.create()` to extract the respective to be queried after.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 393187)
Time Spent: 50m (was: 40m)
> KafkaIO should support inferring schemas when reading Avro
> ----------------------------------------------------------
>
> Key: BEAM-7336
> URL: https://issues.apache.org/jira/browse/BEAM-7336
> Project: Beam
> Issue Type: Sub-task
> Components: io-java-kafka
> Reporter: Reuven Lax
> Assignee: Alexey Romanenko
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> PubSubIO already supports this.
> It would also be nice to be able to look up Avro schemas in the Kafka schema
> registry.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)