[
https://issues.apache.org/jira/browse/BEAM-6913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mousa HAMAD updated BEAM-6913:
------------------------------
Description:
Whenever my pipeline reads from Spanner, the code runs infinitely. If I update
the spanner dependency (_com.google.cloud:google-cloud-spanner_) to e.g.,
_1.11.0,_ then everything works as expected.
Consider the following simple pipeline, which never ends:
{code:java}
public class Prototype_Spanner {
private static String INSTANCE_ID = "XYZ";
private static String DATABASE_ID = "test_beam";
private static String TABLE_NAME = "item";
private static void runExample() {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read", SpannerIO.read()
.withInstanceId(INSTANCE_ID)
.withDatabaseId(DATABASE_ID)
.withTable(TABLE_NAME)
.withColumns("price"))
.apply("Extract Price", MapElements
.into(TypeDescriptors.longs())
.via((Struct struct) -> struct.getLong("price")))
.apply("Calculate Mean", Mean.globally())
.apply("Map to string", MapElements
.into(TypeDescriptor.of(String.class))
.via(Object::toString))
.apply("Write", TextIO.write().to("/tmp/output"));
pipeline.run().waitUntilFinish();
}
public static void main(String[] args) {
runExample();
}
}
{code}
Following is my full list of dependencies:
{code:java}
repositories {
mavenCentral()
}
ext {
beamVersion = '2.11.0'
sparkVersion = '2.3.3'
}
dependencies {
compile "org.apache.beam:beam-sdks-java-core:$beamVersion"
compile
"org.apache.beam:beam-sdks-java-extensions-join-library:$beamVersion"
compile
"org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:$beamVersion"
compile
"org.apache.beam:beam-sdks-java-io-google-cloud-platform:$beamVersion"
compile "org.apache.beam:beam-runners-core-java:$beamVersion"
compile "org.apache.beam:beam-runners-direct-java:$beamVersion"
compile "org.apache.beam:beam-runners-spark:$beamVersion"
compile "org.apache.spark:spark-core_2.11:$sparkVersion"
compile "org.apache.spark:spark-streaming_2.11:$sparkVersion"
// This line fixed the issue for me
// compile "com.google.cloud:google-cloud-spanner:1.11.0"
testCompile "junit:junit:4.12"
}
{code}
was:
Whenever my pipeline reads from Spanner, the code runs infinitely. If I update
the spanner dependency (_com.google.cloud:google-cloud-spanner_) to e.g.,
_1.11.0,_ then everything works as expected.
Consider the following simple pipeline, which never ends:
{code:java}
public class Prototype_Spanner {
private static String INSTANCE_ID = "XYZ";
private static String DATABASE_ID = "test_beam";
private static String TABLE_NAME = "item";
private static void runExample() {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read", SpannerIO.read()
.withInstanceId(INSTANCE_ID)
.withDatabaseId(DATABASE_ID)
.withTable(TABLE_NAME)
.withColumns("price"))
.apply("Extract Price", MapElements
.into(TypeDescriptors.longs())
.via((Struct struct) -> struct.getLong("price")))
.apply("Calculate Mean", Mean.globally())
.apply("Map to string", MapElements
.into(TypeDescriptor.of(String.class))
.via(Object::toString))
.apply("Write", TextIO.write().to("/tmp/output"));
pipeline.run().waitUntilFinish();
}
public static void main(String[] args) {
runExample();
}
}
{code}
> Reading data from Spanner never ends
> ------------------------------------
>
> Key: BEAM-6913
> URL: https://issues.apache.org/jira/browse/BEAM-6913
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.11.0
> Environment: macOS Mojave (10.14.3)
> Reporter: Mousa HAMAD
> Priority: Major
>
> Whenever my pipeline reads from Spanner, the code runs infinitely. If I
> update the spanner dependency (_com.google.cloud:google-cloud-spanner_) to
> e.g., _1.11.0,_ then everything works as expected.
> Consider the following simple pipeline, which never ends:
> {code:java}
> public class Prototype_Spanner {
> private static String INSTANCE_ID = "XYZ";
> private static String DATABASE_ID = "test_beam";
> private static String TABLE_NAME = "item";
> private static void runExample() {
> PipelineOptions options = PipelineOptionsFactory.create();
> options.setRunner(DirectRunner.class);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply("Read", SpannerIO.read()
> .withInstanceId(INSTANCE_ID)
> .withDatabaseId(DATABASE_ID)
> .withTable(TABLE_NAME)
> .withColumns("price"))
> .apply("Extract Price", MapElements
> .into(TypeDescriptors.longs())
> .via((Struct struct) -> struct.getLong("price")))
> .apply("Calculate Mean", Mean.globally())
> .apply("Map to string", MapElements
> .into(TypeDescriptor.of(String.class))
> .via(Object::toString))
> .apply("Write", TextIO.write().to("/tmp/output"));
> pipeline.run().waitUntilFinish();
> }
> public static void main(String[] args) {
> runExample();
> }
> }
> {code}
> Following is my full list of dependencies:
> {code:java}
> repositories {
> mavenCentral()
> }
> ext {
> beamVersion = '2.11.0'
> sparkVersion = '2.3.3'
> }
> dependencies {
> compile "org.apache.beam:beam-sdks-java-core:$beamVersion"
> compile
> "org.apache.beam:beam-sdks-java-extensions-join-library:$beamVersion"
> compile
> "org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:$beamVersion"
> compile
> "org.apache.beam:beam-sdks-java-io-google-cloud-platform:$beamVersion"
> compile "org.apache.beam:beam-runners-core-java:$beamVersion"
> compile "org.apache.beam:beam-runners-direct-java:$beamVersion"
> compile "org.apache.beam:beam-runners-spark:$beamVersion"
> compile "org.apache.spark:spark-core_2.11:$sparkVersion"
> compile "org.apache.spark:spark-streaming_2.11:$sparkVersion"
> // This line fixed the issue for me
> // compile "com.google.cloud:google-cloud-spanner:1.11.0"
> testCompile "junit:junit:4.12"
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)