I would have expected
a META-INF/services/org.apache.beam.sdk.expansion.ExternalTransformRegistrar
file in the jar containing the fully qualified class name
of BigtableRegistrar in it. See
https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-kafka/2.43.0/beam-sdks-java-io-kafka-2.43.0.jar
for an example of how Java's ServiceLoader expects the jar to be laid out.

It looks like the bazel build is not generating the META-INF/ file that the
`@AutoService` annotation is responsible for or the way that the bazel
build is taking the output files from the build process and generating the
jar is forgetting to take that file as well.

On Wed, Dec 28, 2022 at 11:30 PM Lina Mårtensson via dev <
dev@beam.apache.org> wrote:

> I kept working with an ExternalTransformRegistrar solution (although if
> there's an easier way, I'm all ears), and I have Java code that builds, and
> a Python connector that tries to use it.
>
> My current issue is that the expansion service that's started up doesn't
> find my transform using the URN provided:
> RuntimeError: java.lang.UnsupportedOperationException: Unknown urn:
> beam:external:CAMUS:bigtable_read:v1
>
> And I can see that my transform wasn't registered:
>
> INFO:apache_beam.utils.subprocess_server:b'INFO: Registering external
> transforms: [beam:transform:org.apache.beam:pubsub_read:v1,
> beam:transform:org.apache.beam:pubsub_write:v1,
> beam:transform:org.apache.beam:pubsublite_write:v1,
> beam:transform:org.apache.beam:pubsublite_read:v1,
> beam:transform:org.apache.beam:spanner_insert:v1,
> beam:transform:org.apache.beam:spanner_update:v1,
> beam:transform:org.apache.beam:spanner_replace:v1,
> beam:transform:org.apache.beam:spanner_insert_or_update:v1,
> beam:transform:org.apache.beam:spanner_delete:v1,
> beam:transform:org.apache.beam:spanner_read:v1,
> beam:transform:org.apache.beam:schemaio_bigquery_read:v1,
> beam:transform:org.apache.beam:schemaio_bigquery_write:v1,
> beam:transform:org.apache.beam:schemaio_datastoreV1_read:v1,
> beam:transform:org.apache.beam:schemaio_datastoreV1_write:v1,
> beam:transform:org.apache.beam:schemaio_pubsub_read:v1,
> beam:transform:org.apache.beam:schemaio_pubsub_write:v1,
> beam:transform:org.apache.beam:schemaio_jdbc_read:v1,
> beam:transform:org.apache.beam:schemaio_jdbc_write:v1,
> beam:transform:org.apache.beam:schemaio_avro_read:v1,
> beam:transform:org.apache.beam:schemaio_avro_write:v1,
> beam:external:java:generate_sequence:v1]'
>
> I'm creating the expansion service in code like this:
>
>         expansion_service = BeamJarExpansionService(
>
>
> 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
>
>                             extra_args=["{{PORT}}",
> '--javaClassLookupAllowlistFile=*'],
>
>                             classpath=[
> "/home/builder/xlang/bando/bazel-bin/bigtable/libjava_hbase.jar"])
>
> where libjava_hbase.jar was built by Bazel and contains my code:
>
> $ jar tf libjava_hbase.jar
>
> META-INF/
>
> META-INF/MANIFEST.MF
>
> energy/
>
> energy/camus/
>
> energy/camus/beam/
>
> energy/camus/beam/BigtableRegistrar$BigtableReadBuilder$Configuration.class
>
> energy/camus/beam/BigtableRegistrar$BigtableReadBuilder.class
>
> energy/camus/beam/BigtableRegistrar$CrossLanguageConfiguration.class
>
> energy/camus/beam/BigtableRegistrar.class
>
> The relevant part of my code that does the registration looks like this:
>
> @AutoService(ExternalTransformRegistrar.class)
>
> public class BigtableRegistrar implements ExternalTransformRegistrar {
>
>
>     static final String READ_URN = "beam:external:CAMUS:bigtable_read:v1";
>
>
>     @Override
>
>     public Map<String, ExternalTransformBuilder<?, ?, ?>>
> knownBuilderInstances() {
>
>         return ImmutableMap.of(READ_URN, new BigtableReadBuilder());
>
>     }
>
> What am I missing that prevents my transform to be registered?
>
> Thanks,
> -Lina
>
> On Tue, Dec 27, 2022 at 5:11 PM Lina Mårtensson <lina@camus.energy> wrote:
>
>> I finally was able to get back to this and try to make an x-language
>> transform for Bigtable to be used in Python, but I could use some help.
>>
>> I started out with the Bigtable
>> <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java>
>> library, and it seemed like I should be able to go with option 1 here
>> <https://beam.apache.org/documentation/programming-guide/#1311-creating-cross-language-java-transforms>,
>> i.e. not write any Java code.
>>
>> As a non-Java user, it still wasn't obvious how to get this working, but
>> I eventually got it:
>>
>>         java_transform = JavaExternalTransform(
>>
>>             'org.apache.beam.sdk.io.gcp.bigtable.BigtableIO',
>>
>>
>>             BeamJarExpansionService(
>>
>>
>>
>> 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
>>
>>
>>                     extra_args=["{{PORT}}",
>> '--javaClassLookupAllowlistFile=*'])
>>
>>         ).read().withProjectId(projectId="myProjectId")
>>
>>
>>
>>
>>
>>         data = p | 'Read from Bigtable' >> java_transform
>>
>> It wasn't clear to me how to find the right jar to use, or that I needed
>> to add the extra_args when specifying my own JAR.
>>
>> However, I get the following error:
>>
>> RuntimeError: java.lang.RuntimeException: Expected to find exactly one
>> matching method in transform Read{config=BigtableConfig{projectId=null,
>> instanceId=null, tableId=, bigtableOptionsConfigurator=null, options=null},
>> readOptions=BigtableReadOptions{rowFilter=null,
>> keyRanges=[ByteKeyRange{startKey=[], endKey=[]}]}} for BuilderMethodname:
>> "withProjectId"
>>
>> schema {
>>
>>   fields {
>>
>>     name: "projectId"
>>
>>     type {
>>
>>       atomic_type: STRING
>>
>>     }
>>
>>   }
>>
>>   id: "8b43f1f0-313f-4b46-9559-d9d11fd7ecf2"
>>
>> }
>>
>> payload: "\001\000\vmyProjectId"
>>
>>  but found 2
>>
>> at
>> org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.getMethod(JavaClassLookupTransformProvider.java:236)
>>
>> at
>> org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.applyBuilderMethods(JavaClassLookupTransformProvider.java:145)
>>
>> at
>> org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.getTransform(JavaClassLookupTransformProvider.java:129)
>>
>> at
>> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:396)
>>
>> at
>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:515)
>>
>> at
>> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:595)
>>
>> at
>> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:220)
>>
>> at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
>>
>> at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340)
>>
>> at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
>>
>> at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>
>> at
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
>>
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>
>> at java.base/java.lang.Thread.run(Thread.java:829)
>>
>> I believe this is pointing out that there are two withProjectId methods
>> - one that takes String
>> <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L285>,
>> one that takes a ValueProvider
>> <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L273>
>> .
>>
>> I take it this means that the write-no-Java option won't work here? The
>> HBase
>> <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java>
>> implementation looks like it would have the same issue.
>>
>> Before I try and write Java code and convince my team that we're OK to
>> have some Java code, I wanted to check if there's anything I'm missing, or
>> if I'll need to go with the process described in 13.1.1.2 and implement an
>> ExternalTransformBuilder and an ExternalTransformRegistrar.
>>
>> Thanks!
>> -Lina
>>
>>
>> On Thu, Jul 28, 2022 at 5:37 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Thu, Jul 28, 2022 at 4:51 PM Lina Mårtensson <lina@camus.energy>
>>> wrote:
>>>
>>>> Thanks for the detailed answers!
>>>>
>>>> I totally get the points about development & maintenance cost, and,
>>>> from a user perspective, about getting the performance right.
>>>>
>>>> I decided to try out the Spanner connector to get a sense of how well
>>>> the x-language approach works in our world, since that's an existing
>>>> x-language connector.
>>>> Overall, it works and with minimal intervention as you say - it is
>>>> very slow, though.
>>>> I'm a little confused about "portable runners" - if I understand this
>>>> correctly, this means we couldn't run with the DirectRunner anymore if
>>>> using an x-language connector? (At least it didn't work when I tried
>>>> it.)
>>>>
>>>
>>> You'll have to use the portable DirectRunner -
>>> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability
>>>
>>> Job service for this can be started using following command:
>>> python apache_beam/runners/portability/local_job_service_main.py -p
>>> <port>
>>>
>>> Instructions for using this should be similar to here (under "Portable
>>> (Java/Python/Go)"): https://beam.apache.org/documentation/runners/flink/
>>>
>>>
>>>>
>>>> My test of running a trivial GCS-to-Spanner job with 18 KB of input on
>>>> Dataflow takes about 15 minutes end-to-end. 5+ minutes of that is
>>>> uploading the expansion service to GCS, and the startup time on
>>>> Dataflow takes several minutes as well:
>>>> "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS
>>>> upload to
>>>> gs://dataflow-staging-us-central1-92d40d9a13427cbb4dfa41465ce57494/beamapp-lina-0728173601-761137-4rfo0mb9.1659029761.762052/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar
>>>> in 337 seconds."
>>>> Is that expected, or are we doing something strange here? My internet
>>>> isn't very fast here, so these up/downloads can really slow things
>>>> down.
>>>> I tried adding --prebuild_sdk_container_engine=cloud_build but that
>>>> doesn't affect the .jar file.
>>>>
>>>
>>> There are several things contributing to the end-to-end execution time.
>>>
>>> * Time to stage dependencies including the shaded jar file (that is used
>>> both by the expansion service and at runtime).
>>>
>>> This is cross-language only. But you control the jar file. You are
>>> trying to use the
>>> existing beam-sdks-java-io-google-cloud-platform-expansion-service jar
>>> which is a 114 MB file.
>>>
>>> https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform-expansion-service/2.39.0
>>>
>>> Not exactly sure why it took 337 seconds. But could possibly be a
>>> network issue. You could also define a new smaller expansion service jar
>>> just for Spanner if needed.
>>>
>>> * Time to start the job
>>> This is mostly common for both cross-language and non-cross-language
>>> jobs. Starting up the Dataflow worker pool could take some time.
>>> Cross-language could take slightly longer since we need to start both Java
>>> and Python containers but this is a fixed cost (not dependent on the
>>> job/input size).
>>>
>>> * Time to execute the job.
>>> This is what I'd compare if you want to decide on a pure-Python vs a
>>> Java cross-language implementation just based on performance.
>>> Cross-language version would have an added cost to serialize data and send
>>> across SDK harness containers (within the same VM for Dataflow).
>>> On the other hand cross-language version would be reading using a
>>> Java implementation which I expected to be more performant than a pure
>>> Python read implementation.
>>>
>>> Hope this helps.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>
>>>
>>>>
>>>> If we can get this to a workable time, and/or iterate locally, then I
>>>> think an x-language connector for Bigtable could work out well.
>>>> Otherwise we might have to look at a native Python version after all.
>>>>
>>>> Thanks!
>>>> -Lina
>>>>
>>>> On Wed, Jul 27, 2022 at 1:39 PM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>> >
>>>> >
>>>> >
>>>> > On Wed, Jul 27, 2022 at 11:10 AM Lina Mårtensson <lina@camus.energy>
>>>> wrote:
>>>> >>
>>>> >> Thanks Cham!
>>>> >>
>>>> >> Could you provide some more detail on your preference for developing
>>>> a
>>>> >> Python wrapper rather than implementing a source purely in Python?
>>>> >
>>>> >
>>>> > I've mentioned the main advantages of developing a cross-language
>>>> transform over natively implementing this in Python below.
>>>> >
>>>> > * Reduced cost of development
>>>> >
>>>> > It's much easier to  develop a cross-language wrapper of the Java
>>>> source than re-implementing the source in Python. Sources are some of the
>>>> most complex
>>>> > code we have in Beam and sources control the parallelization of the
>>>> pipeline (for example, splitting and dynamic work rebalancing for supported
>>>> runners). So getting this code wrong can result in hard to track data
>>>> loss/duplication related issues.
>>>> > Additionally, based on my experience, it's very hard to get a source
>>>> implementation correct and performant on the first try. It could take
>>>> additional benchmarks/user feedback over time to get the source production
>>>> ready.
>>>> > Java BT source is already battle tested well (actually we have two
>>>> Java implementations [1][2] currently). So I would rather use a Java BT
>>>> connector as a cross-language transform than re-implementing sources for
>>>> other SDKs.
>>>> >
>>>> > * Minimal maintenance cost
>>>> >
>>>> > Developing a source/sink is just a part of the story. We (as a
>>>> community) have to maintain it over time and make sure that ongoing
>>>> issues/feature requests are adequately handled. In the past, we have had
>>>> cases where sources/sinks are available for multiple SDKs but one
>>>> > is significantly better than others when it comes to the feature set
>>>> (for example, BigQuery). Cross-language will make this easier and will
>>>> allow us to maintain key logic in a single place.
>>>> >
>>>> >>
>>>> >>
>>>> >> If I look at the instructions for using the x-language Spanner
>>>> >> connector, then using this - from the user's perspective - would
>>>> >> involve installing a Java runtime.
>>>> >> That's not terrible, but I fear that getting this to work with bazel
>>>> >> might end up being more trouble than expected. (That has often
>>>> >> happened here, and we have enough trouble with getting Python 3.9 and
>>>> >> 3.10 to co-exist.)
>>>> >
>>>> >
>>>> > From an end user perspective, all they should have to do is make sure
>>>> that Java is available in the machine where the job is submitted from. Beam
>>>> has features to allow starting up cross-language expansion services (that
>>>> is needed during job submission) automatically so users should not have to
>>>> do anything other than that.
>>>> >
>>>> > At job execution, Beam (portable) uses Docker-based SDK harness
>>>> containers and we already release appropriate containers for each SDK. The
>>>> runners should seamlessly download containers needed to execute the job.
>>>> >
>>>> > That said, the main downside of cross-language today is runner
>>>> support. Cross-language transform support is only available for portable
>>>> Beam runners (for example, Dataflow Runner v2) but this is the direction
>>>> Beam runners are going anyway.
>>>> >
>>>> >>
>>>> >>
>>>> >> There are a few of us at our small start-up that have written
>>>> >> MapReduces and similar in the past and are completely convinced by
>>>> the
>>>> >> Beam/Dataflow model. But many others have no previous experience and
>>>> >> are skeptical, and see this new tool we're introducing as something
>>>> >> that's more trouble than it's worth, and something they'd rather
>>>> avoid
>>>> >> - even when we see how lots of their use cases could be made much
>>>> >> easier using Beam. I'm worried that every extra hoop to jump through
>>>> >> will make it less likely to be widely used for us. Because of that,
>>>> my
>>>> >> bias would be towards having a Python connector rather than
>>>> >> x-language, and I would find it really helpful to learn about why you
>>>> >> both favor the x-language option.
>>>> >
>>>> >
>>>> > I understand your concerns. It's certainly possible to develop the
>>>> same connector in multiple SDKs (and we provide SDF source framework
>>>> support in all SDK languages). But hopefully my comments above will give
>>>> you an idea of the downsides of this approach :).
>>>> >
>>>> > Thanks,
>>>> > Cham
>>>> >
>>>> > [1]
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
>>>> > [2] https://cloud.google.com/bigtable/docs/hbase-dataflow-java
>>>> >
>>>> >>
>>>> >>
>>>> >> Thanks!
>>>> >> -Lina
>>>> >>
>>>> >> On Tue, Jul 26, 2022 at 6:11 PM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>> >> >
>>>> >> >
>>>> >> >
>>>> >> > On Mon, Jul 25, 2022 at 12:53 PM Lina Mårtensson via dev <
>>>> dev@beam.apache.org> wrote:
>>>> >> >>
>>>> >> >> Hi dev,
>>>> >> >>
>>>> >> >> We're starting to incorporate BigTable in our stack and I've
>>>> delighted
>>>> >> >> my co-workers with how easy it was to create some BigTables with
>>>> >> >> Beam... but there doesn't appear to be a reader for BigTable in
>>>> >> >> Python.
>>>> >> >>
>>>> >> >> First off, is there a good reason why not/any reason why it would
>>>> be difficult?
>>>> >> >
>>>> >> >
>>>> >> > There's was a previous effort to implement a Python BT source but
>>>> that was not completed:
>>>> https://github.com/apache/beam/pull/11295#issuecomment-646378304
>>>> >> >
>>>> >> >>
>>>> >> >>
>>>> >> >> I could write one, but before I start, I'd love some input to
>>>> make it easier.
>>>> >> >>
>>>> >> >> It appears that there would be two options: either write one in
>>>> >> >> Python, or try to set one up with x-language from Java which I
>>>> see is
>>>> >> >> done e.g. with the Spanner IO Connector.
>>>> >> >> Any recommendation on which one to pick or potential pitfalls in
>>>> either choice?
>>>> >> >>
>>>> >> >> If I write one in Python, what should I think about?
>>>> >> >> It is not obvious to me how to achieve parallelization, so any
>>>> tips
>>>> >> >> here would be welcome.
>>>> >> >
>>>> >> >
>>>> >> > I would strongly prefer developing a  Python wrapper for the
>>>> existing Java BT source using Beam's Multi-language Pipelines framework
>>>> over developing a new Python source.
>>>> >> >
>>>> https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines
>>>> >> >
>>>> >> > Thanks,
>>>> >> > Cham
>>>> >> >
>>>> >> >
>>>> >> >>
>>>> >> >>
>>>> >> >> Thanks!
>>>> >> >> -Lina
>>>>
>>>

Reply via email to