ffernandez92 commented on issue #28664:
URL: https://github.com/apache/beam/issues/28664#issuecomment-1764172154
I was trying to run the previous code in Dataflow but i'm facing some
issues, maybe you guys can help.
1 - I created the expansion service jar: `./gradlew
:sdks:java:io:expansion-service:build` . As well as the Python SDK and the Java
docker image.
2 - Ran the following command:
```
python3 -m apache_beam.yaml.main --runner=DataflowRunner --project={project}
--region=us-east4 --temp_location=gs://{gcs_temp_loc}/temp/
--pipeline_spec_file=/home/ferran_fernandez/read_k5.yml
--staging_location=gs://{gcs_staging_loc}/staging
--sdk_location="/home/ferran_fernandez/beam/sdks/python/dist/apache-beam-2.52.0.dev0.tar.gz"
--sdk_harness_container_image_overrides=".*java.*,us-east4-docker.pkg.dev/{project}/beam/beam_java8_sdk:latest"
--streaming
```
I got the following error:
```
INFO:root:Starting a JAR-based expansion service from JAR
/home/ferran_fernandez/beam/sdks/java/io/expansion-service/build/libs/beam-sdks-java-io-expansion-service-2.52.0-SNAPSHOT.jar
INFO:apache_beam.utils.subprocess_server:Starting service with ['java'
'-jar'
'/home/ferran_fernandez/beam/sdks/java/io/expansion-service/build/libs/beam-sdks-java-io-expansion-service-2.52.0-SNAPSHOT.jar'
'53199'
'--filesToStage=/home/ferran_fernandez/beam/sdks/java/io/expansion-service/build/libs/beam-sdks-java-io-expansion-service-2.52.0-SNAPSHOT.jar']
INFO:apache_beam.utils.subprocess_server:Starting expansion service at
localhost:53199
INFO:apache_beam.utils.subprocess_server:Oct 16, 2023 10:16:36 AM
org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO:apache_beam.utils.subprocess_server:INFO: Registering external
transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
beam:transform:org.apache.beam:kafka_write:v1,
beam:external:java:generate_sequence:v1, beam:transform:combine_globally:v1,
beam:transform:combine_grouped_values:v1, beam:transform:combine_per_key:v1,
beam:transform:create_view:v1, beam:transform:flatten:v1,
beam:transform:group_by_key:v1, beam:transform:group_into_batches:v1,
beam:transform:group_into_batches_with_sharded_key:v1,
beam:transform:impulse:v1, beam:transform:reshuffle:v1,
beam:transform:sdf_process_keyed_elements:v1, beam:transform:teststream:v1,
beam:transform:window_into:v1, beam:transform:write_files:v1]
INFO:apache_beam.utils.subprocess_server:
INFO:apache_beam.utils.subprocess_server:Registered transforms:
INFO:apache_beam.utils.subprocess_server:
beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@7c469c48
INFO:apache_beam.utils.subprocess_server:
beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@12e61fe6
INFO:apache_beam.utils.subprocess_server:
beam:transform:org.apache.beam:kafka_write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@7ee955a8
INFO:apache_beam.utils.subprocess_server:
beam:external:java:generate_sequence:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1677d1
INFO:apache_beam.utils.subprocess_server:
beam:transform:combine_globally:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@48fa0f47
INFO:apache_beam.utils.subprocess_server:
beam:transform:combine_grouped_values:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@6ac13091
INFO:apache_beam.utils.subprocess_server:
beam:transform:combine_per_key:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@5e316c74
INFO:apache_beam.utils.subprocess_server:
beam:transform:create_view:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@6d2a209c
INFO:apache_beam.utils.subprocess_server: beam:transform:flatten:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@75329a49
INFO:apache_beam.utils.subprocess_server:
beam:transform:group_by_key:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@161479c6
INFO:apache_beam.utils.subprocess_server:
beam:transform:group_into_batches:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@4313f5bc
INFO:apache_beam.utils.subprocess_server:
beam:transform:group_into_batches_with_sharded_key:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@7f010382
INFO:apache_beam.utils.subprocess_server: beam:transform:impulse:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@1e802ef9
INFO:apache_beam.utils.subprocess_server: beam:transform:reshuffle:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@2b6faea6
INFO:apache_beam.utils.subprocess_server:
beam:transform:sdf_process_keyed_elements:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@778d1062
INFO:apache_beam.utils.subprocess_server:
beam:transform:teststream:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@670002
INFO:apache_beam.utils.subprocess_server:
beam:transform:window_into:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@1f0f1111
INFO:apache_beam.utils.subprocess_server:
beam:transform:write_files:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@49c386c8
INFO:apache_beam.utils.subprocess_server:
INFO:apache_beam.utils.subprocess_server:Registered SchemaTransformProviders:
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:kafka_read:v1
INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:kafka_write:v1
INFO:apache_beam.utils.subprocess_server:Oct 16, 2023 10:16:37 AM
org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO:apache_beam.utils.subprocess_server:INFO: Expanding
'ReadFromKafka/beam:schematransform:org.apache.beam:kafka_read:v1' with URN
'beam:expansion:payload:schematransform:v1'
INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
Traceback (most recent call last):
File
"/home/ferran_fernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 452, in expand_leaf_transform
outputs = inputs | scope.unique_name(spec, ptransform) >> ptransform
File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pvalue.py", line
137, in __or__
return self.pipeline.apply(ptransform, self)
File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py",
line 651, in apply
return self.apply(
File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py",
line 662, in apply
return self.apply(transform, pvalueish)
File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py",
line 708, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File
"/home/ferran_fernandez/beam/sdks/python/apache_beam/runners/runner.py", line
203, in apply
return self.apply_PTransform(transform, input, options)
File
"/home/ferran_fernandez/beam/sdks/python/apache_beam/runners/runner.py", line
207, in apply_PTransform
return transform.expand(input)
File
"/home/ferran_fernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py",
line 370, in recording_expand
result = original_expand(pvalue)
File
"/home/ferran_fernandez/beam/sdks/python/apache_beam/transforms/external.py",
line 425, in expand
return pcolls | self._payload_builder.identifier() >> ExternalTransform(
File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pvalue.py", line
137, in __or__
return self.pipeline.apply(ptransform, self)
File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py",
line 651, in apply
return self.apply(
File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py",
line 662, in apply
return self.apply(transform, pvalueish)
File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py",
line 708, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File
"/home/ferran_fernandez/beam/sdks/python/apache_beam/runners/runner.py", line
203, in apply
return self.apply_PTransform(transform, input, options)
File
"/home/ferran_fernandez/beam/sdks/python/apache_beam/runners/runner.py", line
207, in apply_PTransform
return transform.expand(input)
File
"/home/ferran_fernandez/beam/sdks/python/apache_beam/transforms/external.py",
line 730, in expand
raise RuntimeError(response.error)
RuntimeError: java.lang.IllegalStateException: Missing required properties:
topicPattern keyCoder valueCoder watermarkFn maxReadTime startReadTime
stopReadTime watchTopicPartitionDuration offsetConsumerConfig
keyDeserializerProvider valueDeserializerProvider checkStopReadingFn
at
org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read$Builder.build(AutoValue_KafkaIO_Read.java:585)
at org.apache.beam.sdk.io.kafka.KafkaIO.read(KafkaIO.java:574)
at org.apache.beam.sdk.io.kafka.KafkaIO.readBytes(KafkaIO.java:553)
at
org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider$3.expand(KafkaReadSchemaTransformProvider.java:131)
at
org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider$3.expand(KafkaReadSchemaTransformProvider.java:127)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
at
org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:467)
at
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:620)
at
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:704)
at
org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:355)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:867)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
I tried to rollback my code and use the original code but I got the same
result. My expectation here is that I'm not building this in the right way
(most likely I think it has to do with the autovalues but I'm not sure).
Is there any gradlew command I should be using?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]