robertowm opened a new issue, #26826:
URL: https://github.com/apache/beam/issues/26826
### What happened?
Beam fails to connect to Kafka, as it can't call `setConsumerConfig`
correctly. Issue may be related to failing to transform json (and `Map`) to
Java `Map<String, Object>`. It will transform to
`org.apache.beam.sdk.values.Row`, which leads to `Caused by:
java.lang.IllegalArgumentException: The configuration class class
org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration is missing a
setter setConsumerConfig for consumerConfig with type
org.apache.beam.sdk.values.Row`.
Also, the following error may be related - not sure if there is anything
missing.
```
Error: java.lang.RuntimeException: Failed to get dependencies of
beam:transform:org.apache.beam:kafka_read_without_metadata:v1 from spec urn:
"beam:transform:org.apache.beam:kafka_read_without_metadata:v1"
payload:
"\n\225\002\n\024\n\006topics\032\n\b\001\032\006\n\004\b\001\020\a\n\206\001\n\016consumerConfig\032t\b\0012p\nn\n\031\n\021bootstrap.servers\032\004\b\001\020\a\n\020\n\bgroup.id\032\004\b\001\020\a\n\031\n\021auto.offset.reset\032\004\b\001\020\a\022$c80c7612-2b23-4d6b-8c41-1e019afbd0d1\n\030\n\020key_deserializer\032\004\b\001\020\a\n\032\n\022value_deserializer\032\004\b\001\020\a\n\030\n\020timestamp_policy\032\004\b\001\020\a\022$34205f54-77f4-4001-93cc-0a0e732d5303\022\261\001\005\000\000\000\000\001\001\amytopic\003\000\016127.0.0.1:9093\006ts-001\bearliest8org.apache.kafka.common.serialization.StringDeserializer8org.apache.kafka.common.serialization.StringDeserializer\016ProcessingTime"
```
Code snippet to reproduce error:
```
import { readFromKafka, ReadFromKafkaOptions } from 'apache-beam/io/kafka';
export function createPipeline() {
// A pipeline is simply a callable that takes a root object.
return (root: beam.Root) => {
// same results if using `Map` - it always transform to
`org.apache.beam.sdk.values.Row`
const consumerConfig = {
'bootstrap.servers': '127.0.0.1:9093',
'group.id': 'ts-001',
"auto.offset.reset":"earliest",
};
const topics = ['mytopic'];
// same issue if not providing `options` (default value: `{}`)
const options : ReadFromKafkaOptions = {
keyDeserializer:
"org.apache.kafka.common.serialization.StringDeserializer",
valueDeserializer:
"org.apache.kafka.common.serialization.StringDeserializer",
};
const kafkaReader = readFromKafka(consumerConfig, topics, options);
return root.applyAsync(kafkaReader)
.then(events => events.map((element) => {
console.log(element);
return element;
}));
};
}
```
Output:
```
java [
'-jar',
'/home/local/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.47.0.jar',
'39753',
'--filesToStage=/home/local/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.47.0.jar'
]
Waiting for sdks:java:io:expansion-service:shadowJar to be available on port
39753.
Starting expansion service at localhost:39753
May 23, 2023 9:51:15 AM
org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO: Registering external transforms:
[beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
beam:transform:org.apache.beam:kafka_write:v1,
beam:external:java:generate_sequence:v1]
Registered transforms:
beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5af97850
beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5ef60048
beam:transform:org.apache.beam:kafka_write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1d548a08
beam:external:java:generate_sequence:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@16aa0a0a
Registered SchemaTransformProviders:
beam:schematransform:org.apache.beam:kafka_read:v1
beam:schematransform:org.apache.beam:kafka_write:v1
Service sdks:java:io:expansion-service:shadowJar available.
May 23, 2023 9:51:15 AM
org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO: Expanding 'readFromKafkaWithMetadata' with URN
'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'
May 23, 2023 9:51:15 AM
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
payloadToConfig
WARNING: Configuration class
'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
schema registered. Attempting to construct with setter approach.
Tearing down sdks:java:io:expansion-service:shadowJar.
Error: java.lang.RuntimeException: Failed to get dependencies of
beam:transform:org.apache.beam:kafka_read_without_metadata:v1 from spec urn:
"beam:transform:org.apache.beam:kafka_read_without_metadata:v1"
payload:
"\n\225\002\n\024\n\006topics\032\n\b\001\032\006\n\004\b\001\020\a\n\206\001\n\016consumerConfig\032t\b\0012p\nn\n\031\n\021bootstrap.servers\032\004\b\001\020\a\n\020\n\bgroup.id\032\004\b\001\020\a\n\031\n\021auto.offset.reset\032\004\b\001\020\a\022$c80c7612-2b23-4d6b-8c41-1e019afbd0d1\n\030\n\020key_deserializer\032\004\b\001\020\a\n\032\n\022value_deserializer\032\004\b\001\020\a\n\030\n\020timestamp_policy\032\004\b\001\020\a\022$34205f54-77f4-4001-93cc-0a0e732d5303\022\261\001\005\000\000\000\000\001\001\amytopic\003\000\016127.0.0.1:9093\006ts-001\bearliest8org.apache.kafka.common.serialization.StringDeserializer8org.apache.kafka.common.serialization.StringDeserializer\016ProcessingTime"
at
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1.getDependencies(ExpansionService.java:170)
at
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:522)
at
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:606)
at
org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:305)
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:354)
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: The configuration class class
org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration is missing a
setter setConsumerConfig for consumerConfig with type
org.apache.beam.sdk.values.Row
at
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfigSetters(ExpansionService.java:322)
at
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfig(ExpansionService.java:265)
at
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1.getDependencies(ExpansionService.java:162)
... 11 more
Caused by: java.lang.NoSuchMethodException:
org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration.setConsumerConfig(org.apache.beam.sdk.values.Row)
at java.lang.Class.getMethod(Class.java:1786)
at
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfigSetters(ExpansionService.java:319)
... 13 more
at RawExternalTransform.expandInternalAsync
(/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/transforms/external.js:173:23)
at process.processTicksAndRejections
(node:internal/process/task_queues:95:5)
at async Pipeline.applyAsyncTransform
(/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/internal/pipeline.js:202:22)
at async Root.applyAsync
(/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/pvalue.js:130:16)
at async Runner.runAsync
(/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/runners/runner.js:164:9)
at async Runner.run
(/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/runners/runner.js:149:32)
at async $919eefa079760b8d$export$8ae7a44ba86142d6
(/home/local/Projects/cdc-experiments/beam-typescript/dist/main.js:41:5)
at async $882b6d93070905b3$var$main
(/home/local/Projects/cdc-experiments/beam-typescript/dist/main.js:63:5)
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [X] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]