[ 
https://issues.apache.org/jira/browse/BEAM-1751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15937330#comment-15937330
 ] 

peay edited comment on BEAM-1751 at 3/22/17 11:05 PM:
------------------------------------------------------

I can reproduce with:

{code:java}
public static void main(String[] args) throws URISyntaxException, IOException {
    MyOptions options = PipelineOptionsFactory
        .fromArgs(args)
        .as(MyOptions.class);
    Pipeline pipeline = Pipeline.create(options);

    BigtableOptions bigtableOptions =
        new BigtableOptions.Builder()
            .setProjectId(options.getGoogleCloudProject())
            .setInstanceId(options.getBigtableInstance())
            .setUserAgent("dataflow")
            .build();

    PCollection<Row> properties = pipeline
        .apply(BigtableIO
                .read()
                .withBigtableOptions(bigtableOptions)
                .withTableId(options.getBigtablePropertiesTable()));

    pipeline.run();
  }
{code}

My original code uses the output of BigtableIO as a side input to an unbounded 
collection from KafkaIO, but I can reproduce it with this simple program that 
only has a bounded BigtableIO.

For the runner, I am using {{--maxNumWorkers=1 --streaming}} and that is 
essentially it.


was (Author: peay):
I can reproduce with:

{{code}}
public static void main(String[] args) throws URISyntaxException, IOException {
    MyOptions options = PipelineOptionsFactory
        .fromArgs(args)
        .as(MyOptions.class);
    Pipeline pipeline = Pipeline.create(options);

    BigtableOptions bigtableOptions =
        new BigtableOptions.Builder()
            .setProjectId(options.getGoogleCloudProject())
            .setInstanceId(options.getBigtableInstance())
            .setUserAgent("dataflow")
            .build();

    PCollection<Row> properties = pipeline
        .apply(BigtableIO
                .read()
                .withBigtableOptions(bigtableOptions)
                .withTableId(options.getBigtablePropertiesTable()));

    pipeline.run();
  }
{{code}}

My original code uses the output of BigtableIO as a side input to an unbounded 
collection from KafkaIO, but I can reproduce it with this simple program that 
only has a bounded BigtableIO.

For the runner, I am using {{--maxNumWorkers=1 --streaming}} and that is 
essentially it.

> Singleton ByteKeyRange with BigtableIO and Dataflow runner
> ----------------------------------------------------------
>
>                 Key: BEAM-1751
>                 URL: https://issues.apache.org/jira/browse/BEAM-1751
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow, sdk-java-gcp
>    Affects Versions: 0.5.0
>            Reporter: peay
>            Assignee: Daniel Halperin
>
> I am getting this exception on a smallish table of a couple hundreds of rows 
> from Bigtable, when running on Dataflow with a single worker.
> This doesn't occur with the direct runner on my laptop, only when running on 
> Dataflow. Backtrace is from Beam 0.5.
> {code}java.lang.IllegalArgumentException: Start [xxxxxxxxxx] must be less 
> than end [xxxxxxxxxx]
>       at 
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>       at 
> org.apache.beam.sdk.io.range.ByteKeyRange.<init>(ByteKeyRange.java:288)
>       at 
> org.apache.beam.sdk.io.range.ByteKeyRange.withEndKey(ByteKeyRange.java:278)
>       at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$BigtableSource.withEndKey(BigtableIO.java:728)
>       at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$BigtableReader.splitAtFraction(BigtableIO.java:1034)
>       at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$BigtableReader.splitAtFraction(BigtableIO.java:953)
>       at 
> org.apache.beam.runners.dataflow.DataflowUnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.getCheckpointMark(DataflowUnboundedReadFromBoundedSource.java:530)
>       at 
> org.apache.beam.runners.dataflow.DataflowUnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.getCheckpointMark(DataflowUnboundedReadFromBoundedSource.java:386)
>       at 
> org.apache.beam.runners.dataflow.DataflowUnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.getCheckpointMark(DataflowUnboundedReadFromBoundedSource.java:283)
>       at 
> com.google.cloud.dataflow.worker.runners.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:278)
>       at 
> com.google.cloud.dataflow.worker.runners.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:778)
>       at 
> com.google.cloud.dataflow.worker.runners.worker.StreamingDataflowWorker.access$700(StreamingDataflowWorker.java:105)
>       at 
> com.google.cloud.dataflow.worker.runners.worker.StreamingDataflowWorker$9.run(StreamingDataflowWorker.java:858)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> This is in the log right before:
> {code}
> "Proposing to split 
> ByteKeyRangeTracker{range=ByteKeyRange{startKey=[xxxxxxxxxx], endKey=[]}, 
> position=null} at fraction 0.0 (key [xxxxxxxxxx])"   
> {code}
> I have replaced the actual key with {{xxxxxxxxxx}}, but it is always the same 
> everywhere. In 
> https://github.com/apache/beam/blob/e68a70e08c9fe00df9ec163d1532da130f69588a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRange.java#L260,
>  the end position is obtained by truncating the fractional part of {{size * 
> fraction}}, such that the resulting offset can just be zero if {{fraction}} 
> is too small. `ByteKeyRange` does not allow a singleton range, however. Since 
> {{fraction}} is zero here, the call to {{splitAtFraction}} fails. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to