Aspen Barnes created BEAM-11140:
-----------------------------------
Summary: IndexOutOfBoundsException with nested Row in SqlTransform
Key: BEAM-11140
URL: https://issues.apache.org/jira/browse/BEAM-11140
Project: Beam
Issue Type: Bug
Components: dsl-sql
Affects Versions: 2.25.0, 2.24.0, 2.23.0, 2.22.0, 2.21.0, 2.20.0, 2.19.0
Environment: Beam 2.19.0-2.25.0 with sql extension. JDK 8.
Reporter: Aspen Barnes
When querying against a nested Row using the SqlTransform, if the object is too
deeply nested (depth >4) it will cause the calcite ExpressionWriter to throw
IndexOutOfBoundsException. This level of nesting is common with protobufs and
similar.
Â
{code}
java.lang.IndexOutOfBoundsException: Index: 20, Size: 20
at java.util.ArrayList.rangeCheck(ArrayList.java:659)
at java.util.ArrayList.get(ArrayList.java:435)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.begin(ExpressionWriter.java:78)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.begin(ExpressionWriter.java:101)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:73)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:84)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ConditionalStatement.accept0(ConditionalStatement.java:62)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.append(ExpressionWriter.java:129)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodDeclaration.accept(MethodDeclaration.java:73)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.ExpressionWriter.list(ExpressionWriter.java:167)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.NewExpression.accept(NewExpression.java:62)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.TernaryExpression.accept(TernaryExpression.java:61)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.UnaryExpression.accept(UnaryExpression.java:49)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.MethodCallExpression.accept(MethodCallExpression.java:105)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.GotoStatement.accept0(GotoStatement.java:80)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.BlockStatement.accept0(BlockStatement.java:75)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Statement.accept(Statement.java:32)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.AbstractNode.toString(AbstractNode.java:51)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:215)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:118)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:134)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
at
org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:262)
at
com.etsy.r2k.unbounded.proc.sql.ProcessorTest.doTest(ProcessorTest.java:61)
at
com.etsy.r2k.unbounded.proc.sql.ProcessorTest.testRecursionError(ProcessorTest.java:37)
{code}
Test Code:
{code:java}
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class ProcessorTest {
@Rule public final TestPipeline pipeline = TestPipeline.create();
@Before
public void setup() {
pipeline.enableAutoRunIfMissing(false).enableAbandonedNodeEnforcement(false);
}
@Test
public void testRecursion() {
doTest(4);
}
@Test
public void testRecursionError() {
// TODO file bug
doTest(5);
}
private void doTest(final int depth) {
Schema schema = Schema.builder().addStringField("foo").build();
Row row = Row.withSchema(schema).addValue("your boat").build();
for (int i = 0; i < depth; i++) {
schema = Schema.builder().addRowField(String.format("foo%d", i),
schema).build();
row = Row.withSchema(schema).addValue(row).build();
}
final TestStream<Row> stream =
TestStream.create(schema).addElements(row).advanceWatermarkToInfinity();
final PCollection<Row> input = pipeline.apply(stream);
final PCollectionTuple tuple = PCollectionTuple.of("a", input);
final StringBuilder selector = new StringBuilder();
for (int i = depth - 1; i >= 0; i--) {
selector.append(".`foo").append(i).append('`');
}
final SqlTransform transform = SqlTransform.query("SELECT a" + selector +
".foo FROM a");
final PCollection<String> results =
tuple
.apply("query", transform)
.apply(
MapElements.into(TypeDescriptors.strings())
.via((SerializableFunction<Row, String>) r ->
r.getString("foo")));
PAssert.that(results).containsInAnyOrder("your boat");
pipeline.run().waitUntilFinish();
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)