Aspen Barnes created BEAM-11140:
-----------------------------------

             Summary: IndexOutOfBoundsException with nested Row in SqlTransform
                 Key: BEAM-11140
                 URL: https://issues.apache.org/jira/browse/BEAM-11140
             Project: Beam
          Issue Type: Bug
          Components: dsl-sql
    Affects Versions: 2.25.0, 2.24.0, 2.23.0, 2.22.0, 2.21.0, 2.20.0, 2.19.0
         Environment: Beam 2.19.0-2.25.0 with sql extension. JDK 8.
            Reporter: Aspen Barnes


When querying against a nested Row using the SqlTransform, if the object is too 
deeply nested (depth >4) it will cause the calcite ExpressionWriter to throw 
IndexOutOfBoundsException. This level of nesting is common with protobufs and 
similar.

 
{code}
 java.lang.IndexOutOfBoundsException: Index: 20, Size: 20
        at java.util.ArrayList.rangeCheck(ArrayList.java:659)
        at java.util.ArrayList.get(ArrayList.java:435)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.begin(ExpressionWriter.java:78)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.begin(ExpressionWriter.java:101)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:73)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.UnaryExpression.accept(UnaryExpression.java:49)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:80)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
        at 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.AbstractNode.toString(AbstractNode.java:51)
        at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:215)
        at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:118)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
        at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
        at 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
        at 
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:134)
        at 
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
        at 
org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:262)
        at 
com.etsy.r2k.unbounded.proc.sql.ProcessorTest.doTest(ProcessorTest.java:61)
        at 
com.etsy.r2k.unbounded.proc.sql.ProcessorTest.testRecursionError(ProcessorTest.java:37)
 {code}
Test Code:
{code:java}
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ProcessorTest {
  @Rule public final TestPipeline pipeline = TestPipeline.create();

  @Before
  public void setup() {
    
pipeline.enableAutoRunIfMissing(false).enableAbandonedNodeEnforcement(false);
  }

  @Test
  public void testRecursion() {
    doTest(4);
  }

  @Test
  public void testRecursionError() {
    // TODO file bug
    doTest(5);
  }

  private void doTest(final int depth) {
    Schema schema = Schema.builder().addStringField("foo").build();

    Row row = Row.withSchema(schema).addValue("your boat").build();
    for (int i = 0; i < depth; i++) {
      schema = Schema.builder().addRowField(String.format("foo%d", i), 
schema).build();
      row = Row.withSchema(schema).addValue(row).build();
    }

    final TestStream<Row> stream =
        TestStream.create(schema).addElements(row).advanceWatermarkToInfinity();

    final PCollection<Row> input = pipeline.apply(stream);
    final PCollectionTuple tuple = PCollectionTuple.of("a", input);
    final StringBuilder selector = new StringBuilder();
    for (int i = depth - 1; i >= 0; i--) {
      selector.append(".`foo").append(i).append('`');
    }
    final SqlTransform transform = SqlTransform.query("SELECT a" + selector + 
".foo FROM a");
    final PCollection<String> results =
        tuple
            .apply("query", transform)
            .apply(
                MapElements.into(TypeDescriptors.strings())
                    .via((SerializableFunction<Row, String>) r -> 
r.getString("foo")));

    PAssert.that(results).containsInAnyOrder("your boat");
    pipeline.run().waitUntilFinish();
  }
}

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to