robertowm opened a new issue, #26826:
URL: https://github.com/apache/beam/issues/26826

   ### What happened?
   
   Beam fails to connect to Kafka, as it can't call `setConsumerConfig` 
correctly. Issue may be related to failing to transform json (and `Map`) to 
Java `Map<String, Object>`. It will transform to 
`org.apache.beam.sdk.values.Row`, which leads to `Caused by: 
java.lang.IllegalArgumentException: The configuration class class 
org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration is missing a 
setter setConsumerConfig for consumerConfig with type 
org.apache.beam.sdk.values.Row`.
   
   Also, the following error may be related - not sure if there is anything 
missing.
   ```
   Error: java.lang.RuntimeException: Failed to get dependencies of 
beam:transform:org.apache.beam:kafka_read_without_metadata:v1 from spec urn: 
"beam:transform:org.apache.beam:kafka_read_without_metadata:v1"
   payload: 
"\n\225\002\n\024\n\006topics\032\n\b\001\032\006\n\004\b\001\020\a\n\206\001\n\016consumerConfig\032t\b\0012p\nn\n\031\n\021bootstrap.servers\032\004\b\001\020\a\n\020\n\bgroup.id\032\004\b\001\020\a\n\031\n\021auto.offset.reset\032\004\b\001\020\a\022$c80c7612-2b23-4d6b-8c41-1e019afbd0d1\n\030\n\020key_deserializer\032\004\b\001\020\a\n\032\n\022value_deserializer\032\004\b\001\020\a\n\030\n\020timestamp_policy\032\004\b\001\020\a\022$34205f54-77f4-4001-93cc-0a0e732d5303\022\261\001\005\000\000\000\000\001\001\amytopic\003\000\016127.0.0.1:9093\006ts-001\bearliest8org.apache.kafka.common.serialization.StringDeserializer8org.apache.kafka.common.serialization.StringDeserializer\016ProcessingTime"
   ```
   
   Code snippet to reproduce error:
   ```
   import { readFromKafka, ReadFromKafkaOptions } from 'apache-beam/io/kafka';
   
   export function createPipeline() {
     // A pipeline is simply a callable that takes a root object.
     return (root: beam.Root) => {
       // same results if using `Map` - it always transform to 
`org.apache.beam.sdk.values.Row`
       const consumerConfig = {
         'bootstrap.servers': '127.0.0.1:9093',
         'group.id': 'ts-001',
         "auto.offset.reset":"earliest",
       };
       const topics = ['mytopic'];
       // same issue if not providing `options` (default value: `{}`)
       const options : ReadFromKafkaOptions = {
         keyDeserializer:
           "org.apache.kafka.common.serialization.StringDeserializer",
         valueDeserializer:
           "org.apache.kafka.common.serialization.StringDeserializer",
       };
   
       const kafkaReader = readFromKafka(consumerConfig, topics, options);
       return root.applyAsync(kafkaReader)
         .then(events => events.map((element) => {
           console.log(element);
           return element;
         }));
     };
   }
   ```
   
   Output:
   ```
   java [
     '-jar',
     
'/home/local/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.47.0.jar',
     '39753',
     
'--filesToStage=/home/local/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.47.0.jar'
   ]
   Waiting for sdks:java:io:expansion-service:shadowJar to be available on port 
39753.
   Starting expansion service at localhost:39753
   May 23, 2023 9:51:15 AM 
org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
   INFO: Registering external transforms: 
[beam:transform:org.apache.beam:kafka_read_with_metadata:v1, 
beam:transform:org.apache.beam:kafka_read_without_metadata:v1, 
beam:transform:org.apache.beam:kafka_write:v1, 
beam:external:java:generate_sequence:v1]
   
   Registered transforms:
        beam:transform:org.apache.beam:kafka_read_with_metadata:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5af97850
        beam:transform:org.apache.beam:kafka_read_without_metadata:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5ef60048
        beam:transform:org.apache.beam:kafka_write:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1d548a08
        beam:external:java:generate_sequence:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@16aa0a0a
   
   Registered SchemaTransformProviders:
        beam:schematransform:org.apache.beam:kafka_read:v1
        beam:schematransform:org.apache.beam:kafka_write:v1
   Service sdks:java:io:expansion-service:shadowJar available.
   May 23, 2023 9:51:15 AM 
org.apache.beam.sdk.expansion.service.ExpansionService expand
   INFO: Expanding 'readFromKafkaWithMetadata' with URN 
'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'
   May 23, 2023 9:51:15 AM 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
 payloadToConfig
   WARNING: Configuration class 
'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no 
schema registered. Attempting to construct with setter approach.
   Tearing down sdks:java:io:expansion-service:shadowJar.
   Error: java.lang.RuntimeException: Failed to get dependencies of 
beam:transform:org.apache.beam:kafka_read_without_metadata:v1 from spec urn: 
"beam:transform:org.apache.beam:kafka_read_without_metadata:v1"
   payload: 
"\n\225\002\n\024\n\006topics\032\n\b\001\032\006\n\004\b\001\020\a\n\206\001\n\016consumerConfig\032t\b\0012p\nn\n\031\n\021bootstrap.servers\032\004\b\001\020\a\n\020\n\bgroup.id\032\004\b\001\020\a\n\031\n\021auto.offset.reset\032\004\b\001\020\a\022$c80c7612-2b23-4d6b-8c41-1e019afbd0d1\n\030\n\020key_deserializer\032\004\b\001\020\a\n\032\n\022value_deserializer\032\004\b\001\020\a\n\030\n\020timestamp_policy\032\004\b\001\020\a\022$34205f54-77f4-4001-93cc-0a0e732d5303\022\261\001\005\000\000\000\000\001\001\amytopic\003\000\016127.0.0.1:9093\006ts-001\bearliest8org.apache.kafka.common.serialization.StringDeserializer8org.apache.kafka.common.serialization.StringDeserializer\016ProcessingTime"
   
        at 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1.getDependencies(ExpansionService.java:170)
        at 
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:522)
        at 
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:606)
        at 
org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:305)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:354)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.IllegalArgumentException: The configuration class class 
org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration is missing a 
setter setConsumerConfig for consumerConfig with type 
org.apache.beam.sdk.values.Row
        at 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfigSetters(ExpansionService.java:322)
        at 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfig(ExpansionService.java:265)
        at 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1.getDependencies(ExpansionService.java:162)
        ... 11 more
   Caused by: java.lang.NoSuchMethodException: 
org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration.setConsumerConfig(org.apache.beam.sdk.values.Row)
        at java.lang.Class.getMethod(Class.java:1786)
        at 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.payloadToConfigSetters(ExpansionService.java:319)
        ... 13 more
   
       at RawExternalTransform.expandInternalAsync 
(/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/transforms/external.js:173:23)
       at process.processTicksAndRejections 
(node:internal/process/task_queues:95:5)
       at async Pipeline.applyAsyncTransform 
(/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/internal/pipeline.js:202:22)
       at async Root.applyAsync 
(/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/pvalue.js:130:16)
       at async Runner.runAsync 
(/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/runners/runner.js:164:9)
       at async Runner.run 
(/home/local/Projects/cdc-experiments/beam-typescript/node_modules/apache-beam/dist/src/apache_beam/runners/runner.js:149:32)
       at async $919eefa079760b8d$export$8ae7a44ba86142d6 
(/home/local/Projects/cdc-experiments/beam-typescript/dist/main.js:41:5)
       at async $882b6d93070905b3$var$main 
(/home/local/Projects/cdc-experiments/beam-typescript/dist/main.js:63:5)
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [X] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to