ffernandez92 commented on issue #28664:
URL: https://github.com/apache/beam/issues/28664#issuecomment-1764172154

   I was trying to run the previous code in Dataflow but i'm facing some 
issues, maybe you guys can help.
   
   1 - I created the expansion service jar: `./gradlew 
:sdks:java:io:expansion-service:build` . As well as the Python SDK and the Java 
docker image.
   
   2 - Ran the following command: 
   ```
   python3 -m apache_beam.yaml.main --runner=DataflowRunner --project={project} 
--region=us-east4 --temp_location=gs://{gcs_temp_loc}/temp/ 
--pipeline_spec_file=/home/ferran_fernandez/read_k5.yml 
--staging_location=gs://{gcs_staging_loc}/staging 
--sdk_location="/home/ferran_fernandez/beam/sdks/python/dist/apache-beam-2.52.0.dev0.tar.gz"
 
--sdk_harness_container_image_overrides=".*java.*,us-east4-docker.pkg.dev/{project}/beam/beam_java8_sdk:latest"
 --streaming
   ```
   
   I got the following error:
   ```
   INFO:root:Starting a JAR-based expansion service from JAR 
/home/ferran_fernandez/beam/sdks/java/io/expansion-service/build/libs/beam-sdks-java-io-expansion-service-2.52.0-SNAPSHOT.jar
 
   INFO:apache_beam.utils.subprocess_server:Starting service with ['java' 
'-jar' 
'/home/ferran_fernandez/beam/sdks/java/io/expansion-service/build/libs/beam-sdks-java-io-expansion-service-2.52.0-SNAPSHOT.jar'
 '53199' 
'--filesToStage=/home/ferran_fernandez/beam/sdks/java/io/expansion-service/build/libs/beam-sdks-java-io-expansion-service-2.52.0-SNAPSHOT.jar']
   INFO:apache_beam.utils.subprocess_server:Starting expansion service at 
localhost:53199
   INFO:apache_beam.utils.subprocess_server:Oct 16, 2023 10:16:36 AM 
org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
   INFO:apache_beam.utils.subprocess_server:INFO: Registering external 
transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1, 
beam:transform:org.apache.beam:kafka_read_without_metadata:v1, 
beam:transform:org.apache.beam:kafka_write:v1, 
beam:external:java:generate_sequence:v1, beam:transform:combine_globally:v1, 
beam:transform:combine_grouped_values:v1, beam:transform:combine_per_key:v1, 
beam:transform:create_view:v1, beam:transform:flatten:v1, 
beam:transform:group_by_key:v1, beam:transform:group_into_batches:v1, 
beam:transform:group_into_batches_with_sharded_key:v1, 
beam:transform:impulse:v1, beam:transform:reshuffle:v1, 
beam:transform:sdf_process_keyed_elements:v1, beam:transform:teststream:v1, 
beam:transform:window_into:v1, beam:transform:write_files:v1]
   INFO:apache_beam.utils.subprocess_server:
   INFO:apache_beam.utils.subprocess_server:Registered transforms:
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:org.apache.beam:kafka_read_with_metadata:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@7c469c48
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:org.apache.beam:kafka_read_without_metadata:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@12e61fe6
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:org.apache.beam:kafka_write:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@7ee955a8
   INFO:apache_beam.utils.subprocess_server:       
beam:external:java:generate_sequence:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1677d1
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:combine_globally:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@48fa0f47
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:combine_grouped_values:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@6ac13091
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:combine_per_key:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@5e316c74
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:create_view:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@6d2a209c
   INFO:apache_beam.utils.subprocess_server:       beam:transform:flatten:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@75329a49
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:group_by_key:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@161479c6
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:group_into_batches:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@4313f5bc
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:group_into_batches_with_sharded_key:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@7f010382
   INFO:apache_beam.utils.subprocess_server:       beam:transform:impulse:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@1e802ef9
   INFO:apache_beam.utils.subprocess_server:       beam:transform:reshuffle:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@2b6faea6
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:sdf_process_keyed_elements:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@778d1062
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:teststream:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@670002
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:window_into:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@1f0f1111
   INFO:apache_beam.utils.subprocess_server:       
beam:transform:write_files:v1: 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@49c386c8
   INFO:apache_beam.utils.subprocess_server:
   INFO:apache_beam.utils.subprocess_server:Registered SchemaTransformProviders:
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:kafka_read:v1
   INFO:apache_beam.utils.subprocess_server:       
beam:schematransform:org.apache.beam:kafka_write:v1
   INFO:apache_beam.utils.subprocess_server:Oct 16, 2023 10:16:37 AM 
org.apache.beam.sdk.expansion.service.ExpansionService expand
   INFO:apache_beam.utils.subprocess_server:INFO: Expanding 
'ReadFromKafka/beam:schematransform:org.apache.beam:kafka_read:v1' with URN 
'beam:expansion:payload:schematransform:v1'
   INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
   Traceback (most recent call last):
     File 
"/home/ferran_fernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 452, in expand_leaf_transform
       outputs = inputs | scope.unique_name(spec, ptransform) >> ptransform
     File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pvalue.py", line 
137, in __or__
       return self.pipeline.apply(ptransform, self)
     File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py", 
line 651, in apply
       return self.apply(
     File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py", 
line 662, in apply
       return self.apply(transform, pvalueish)
     File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py", 
line 708, in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
     File 
"/home/ferran_fernandez/beam/sdks/python/apache_beam/runners/runner.py", line 
203, in apply
       return self.apply_PTransform(transform, input, options)
     File 
"/home/ferran_fernandez/beam/sdks/python/apache_beam/runners/runner.py", line 
207, in apply_PTransform
       return transform.expand(input)
     File 
"/home/ferran_fernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", 
line 370, in recording_expand
       result = original_expand(pvalue)
     File 
"/home/ferran_fernandez/beam/sdks/python/apache_beam/transforms/external.py", 
line 425, in expand
       return pcolls | self._payload_builder.identifier() >> ExternalTransform(
     File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pvalue.py", line 
137, in __or__
       return self.pipeline.apply(ptransform, self)
     File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py", 
line 651, in apply
       return self.apply(
     File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py", 
line 662, in apply
       return self.apply(transform, pvalueish)
     File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py", 
line 708, in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
     File 
"/home/ferran_fernandez/beam/sdks/python/apache_beam/runners/runner.py", line 
203, in apply
       return self.apply_PTransform(transform, input, options)
     File 
"/home/ferran_fernandez/beam/sdks/python/apache_beam/runners/runner.py", line 
207, in apply_PTransform
       return transform.expand(input)
     File 
"/home/ferran_fernandez/beam/sdks/python/apache_beam/transforms/external.py", 
line 730, in expand
       raise RuntimeError(response.error)
   RuntimeError: java.lang.IllegalStateException: Missing required properties: 
topicPattern keyCoder valueCoder watermarkFn maxReadTime startReadTime 
stopReadTime watchTopicPartitionDuration offsetConsumerConfig 
keyDeserializerProvider valueDeserializerProvider checkStopReadingFn
           at 
org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read$Builder.build(AutoValue_KafkaIO_Read.java:585)
           at org.apache.beam.sdk.io.kafka.KafkaIO.read(KafkaIO.java:574)
           at org.apache.beam.sdk.io.kafka.KafkaIO.readBytes(KafkaIO.java:553)
           at 
org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider$3.expand(KafkaReadSchemaTransformProvider.java:131)
           at 
org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider$3.expand(KafkaReadSchemaTransformProvider.java:127)
           at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
           at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
           at 
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:467)
           at 
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:620)
           at 
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:704)
           at 
org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:355)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:867)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)
   ```
   
   I tried to rollback my code and use the original code but I got the same 
result. My expectation here is that I'm not building this in the right way 
(most likely I think it has to do with the autovalues but I'm not sure).
   
   Is there any gradlew command I should be using?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to