[ 
https://issues.apache.org/jira/browse/BEAM-6913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mousa HAMAD updated BEAM-6913:
------------------------------
    Description: 
Whenever my pipeline reads from Spanner, the code runs infinitely. If I update 
the spanner dependency (_com.google.cloud:google-cloud-spanner_) to e.g., 
_1.11.0,_ then everything works as expected.

Consider the following simple pipeline, which never ends:
{code:java}
public class Prototype_Spanner {

    private static String INSTANCE_ID = "XYZ";
    private static String DATABASE_ID = "test_beam";
    private static String TABLE_NAME = "item";

    private static void runExample() {
        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(DirectRunner.class);

        Pipeline pipeline = Pipeline.create(options);

        pipeline
                .apply("Read", SpannerIO.read()
                        .withInstanceId(INSTANCE_ID)
                        .withDatabaseId(DATABASE_ID)
                        .withTable(TABLE_NAME)
                        .withColumns("price"))
                .apply("Extract Price", MapElements
                        .into(TypeDescriptors.longs())
                        .via((Struct struct) -> struct.getLong("price")))
                .apply("Calculate Mean", Mean.globally())
                .apply("Map to string", MapElements
                        .into(TypeDescriptor.of(String.class))
                        .via(Object::toString))
                .apply("Write", TextIO.write().to("/tmp/output"));

        pipeline.run().waitUntilFinish();
    }

    public static void main(String[] args) {
        runExample();
    }

}
{code}
Following is my full list of dependencies:
{code:java}
repositories {
    mavenCentral()
}

ext {
    beamVersion = '2.11.0'
    sparkVersion = '2.3.3'
}

dependencies {
    compile "org.apache.beam:beam-sdks-java-core:$beamVersion"
    compile 
"org.apache.beam:beam-sdks-java-extensions-join-library:$beamVersion"
    compile 
"org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:$beamVersion"
    compile 
"org.apache.beam:beam-sdks-java-io-google-cloud-platform:$beamVersion"
    compile "org.apache.beam:beam-runners-core-java:$beamVersion"
    compile "org.apache.beam:beam-runners-direct-java:$beamVersion"
    compile "org.apache.beam:beam-runners-spark:$beamVersion"

    compile "org.apache.spark:spark-core_2.11:$sparkVersion"
    compile "org.apache.spark:spark-streaming_2.11:$sparkVersion"

    // This line fixed the issue for me
    // compile "com.google.cloud:google-cloud-spanner:1.11.0"

    testCompile "junit:junit:4.12"
}
{code}

  was:
Whenever my pipeline reads from Spanner, the code runs infinitely. If I update 
the spanner dependency (_com.google.cloud:google-cloud-spanner_) to e.g., 
_1.11.0,_ then everything works as expected.

Consider the following simple pipeline, which never ends:
{code:java}
public class Prototype_Spanner {

    private static String INSTANCE_ID = "XYZ";
    private static String DATABASE_ID = "test_beam";
    private static String TABLE_NAME = "item";

    private static void runExample() {
        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(DirectRunner.class);

        Pipeline pipeline = Pipeline.create(options);

        pipeline
                .apply("Read", SpannerIO.read()
                        .withInstanceId(INSTANCE_ID)
                        .withDatabaseId(DATABASE_ID)
                        .withTable(TABLE_NAME)
                        .withColumns("price"))
                .apply("Extract Price", MapElements
                        .into(TypeDescriptors.longs())
                        .via((Struct struct) -> struct.getLong("price")))
                .apply("Calculate Mean", Mean.globally())
                .apply("Map to string", MapElements
                        .into(TypeDescriptor.of(String.class))
                        .via(Object::toString))
                .apply("Write", TextIO.write().to("/tmp/output"));

        pipeline.run().waitUntilFinish();
    }

    public static void main(String[] args) {
        runExample();
    }

}
{code}


> Reading data from Spanner never ends
> ------------------------------------
>
>                 Key: BEAM-6913
>                 URL: https://issues.apache.org/jira/browse/BEAM-6913
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.11.0
>         Environment: macOS Mojave (10.14.3)
>            Reporter: Mousa HAMAD
>            Priority: Major
>
> Whenever my pipeline reads from Spanner, the code runs infinitely. If I 
> update the spanner dependency (_com.google.cloud:google-cloud-spanner_) to 
> e.g., _1.11.0,_ then everything works as expected.
> Consider the following simple pipeline, which never ends:
> {code:java}
> public class Prototype_Spanner {
>     private static String INSTANCE_ID = "XYZ";
>     private static String DATABASE_ID = "test_beam";
>     private static String TABLE_NAME = "item";
>     private static void runExample() {
>         PipelineOptions options = PipelineOptionsFactory.create();
>         options.setRunner(DirectRunner.class);
>         Pipeline pipeline = Pipeline.create(options);
>         pipeline
>                 .apply("Read", SpannerIO.read()
>                         .withInstanceId(INSTANCE_ID)
>                         .withDatabaseId(DATABASE_ID)
>                         .withTable(TABLE_NAME)
>                         .withColumns("price"))
>                 .apply("Extract Price", MapElements
>                         .into(TypeDescriptors.longs())
>                         .via((Struct struct) -> struct.getLong("price")))
>                 .apply("Calculate Mean", Mean.globally())
>                 .apply("Map to string", MapElements
>                         .into(TypeDescriptor.of(String.class))
>                         .via(Object::toString))
>                 .apply("Write", TextIO.write().to("/tmp/output"));
>         pipeline.run().waitUntilFinish();
>     }
>     public static void main(String[] args) {
>         runExample();
>     }
> }
> {code}
> Following is my full list of dependencies:
> {code:java}
> repositories {
>     mavenCentral()
> }
> ext {
>     beamVersion = '2.11.0'
>     sparkVersion = '2.3.3'
> }
> dependencies {
>     compile "org.apache.beam:beam-sdks-java-core:$beamVersion"
>     compile 
> "org.apache.beam:beam-sdks-java-extensions-join-library:$beamVersion"
>     compile 
> "org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:$beamVersion"
>     compile 
> "org.apache.beam:beam-sdks-java-io-google-cloud-platform:$beamVersion"
>     compile "org.apache.beam:beam-runners-core-java:$beamVersion"
>     compile "org.apache.beam:beam-runners-direct-java:$beamVersion"
>     compile "org.apache.beam:beam-runners-spark:$beamVersion"
>     compile "org.apache.spark:spark-core_2.11:$sparkVersion"
>     compile "org.apache.spark:spark-streaming_2.11:$sparkVersion"
>     // This line fixed the issue for me
>     // compile "com.google.cloud:google-cloud-spanner:1.11.0"
>     testCompile "junit:junit:4.12"
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to